Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28    ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39// Note: `resolve_windows_shell` is no longer imported at module scope —
40// production code in `spawn_detached_child` uses `shell_candidates()`
41// with retry instead, and the function remains in `windows_shell.rs`
42// for tests and as a future helper.
43
44/// Default timeout for background bash tasks: 30 minutes.
45/// Agents can override per-call via the `timeout` parameter (in ms).
46const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
52/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
53/// of typical command output (git status, test results, exit messages) — short
54/// enough that round-tripping multiple completions in one reminder stays well
55/// under the model's context budget but long enough that most successful runs
56/// don't need a follow-up `bash_status` call.
57const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62    pub task_id: String,
63    /// Intentionally omitted from serialized completion payloads: push frames
64    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
65    #[serde(skip_serializing)]
66    pub session_id: String,
67    pub status: BgTaskStatus,
68    pub exit_code: Option<i32>,
69    pub command: String,
70    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
71    /// cached so push-frame consumers and `bash_drain_completions` callers see
72    /// the same preview without racing against later output rotation. Empty
73    /// when not captured (e.g., persisted task seen on startup before buffer
74    /// reattachment).
75    #[serde(default, skip_serializing_if = "String::is_empty")]
76    pub output_preview: String,
77    /// True when the captured tail is shorter than the actual output (because
78    /// rotation occurred or the output exceeds the preview cap). Plugins use
79    /// this to render a `…` prefix and signal that `bash_status` would return
80    /// more.
81    #[serde(default, skip_serializing_if = "is_false")]
82    pub output_truncated: bool,
83    /// Token count for raw stdout+stderr before compression. Omitted when any
84    /// stream exceeds the 128 KiB tokenization cap.
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub original_tokens: Option<u32>,
87    /// Token count for the compressed output generated from the same capped
88    /// raw payload. Omitted when raw tokenization is skipped.
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub compressed_tokens: Option<u32>,
91    /// True when a stream exceeded the tokenization cap and counts are absent.
92    #[serde(default, skip_serializing_if = "is_false")]
93    pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97    !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102    #[serde(flatten)]
103    pub info: BgTaskInfo,
104    pub exit_code: Option<i32>,
105    pub child_pid: Option<u32>,
106    pub workdir: String,
107    pub output_preview: String,
108    pub output_truncated: bool,
109    pub output_path: Option<String>,
110    pub stderr_path: Option<String>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub pty_rows: Option<u16>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119    pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125    pub(crate) progress_sender: SharedProgressSender,
126    watchdog_started: AtomicBool,
127    pub(crate) shutdown: AtomicBool,
128    pub(crate) long_running_reminder_enabled: AtomicBool,
129    pub(crate) long_running_reminder_interval_ms: AtomicU64,
130    persisted_gc_started: AtomicBool,
131    #[cfg(test)]
132    persisted_gc_runs: AtomicU64,
133    /// Output compression callback. Set by `AppContext` after construction.
134    /// Takes (command, raw_output) and returns compressed text. Called from
135    /// the watchdog thread when a task reaches a terminal state and from
136    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
137    /// uncompressed.
138    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140    pub(crate) db_harness: RwLock<Option<String>>,
141    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143    pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147    pub(crate) task_id: String,
148    pub(crate) session_id: String,
149    pub(crate) paths: TaskPaths,
150    pub(crate) started: Instant,
151    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152    pub(crate) terminal_at: Mutex<Option<Instant>>,
153    pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157    Piped(Option<Child>),
158    Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162    pub(crate) metadata: PersistedTask,
163    pub(crate) runtime: TaskRuntime,
164    pub(crate) detached: bool,
165    /// True once `reap_child` has observed the direct child handle's exit
166    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
167    /// `is_process_alive(child_pid)` probe on the second pass — we already
168    /// have authoritative evidence that the child is dead, no need to
169    /// re-verify via PID liveness which is unreliable on Windows where
170    /// PIDs can be recycled within seconds.
171    ///
172    /// Remains `false` on replay-restored tasks (those have a `child_pid`
173    /// but never observed exit via this process's `try_wait()`), so those
174    /// continue to fall through to the `is_process_alive` probe path.
175    pub(crate) child_exit_observed: bool,
176    pub(crate) buffer: BgBuffer,
177    /// PTY-only: set for timeout kill intent before signaling the child.
178    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182    pub fn new(progress_sender: SharedProgressSender) -> Self {
183        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184        Self {
185            inner: Arc::new(RegistryInner {
186                tasks: Mutex::new(HashMap::new()),
187                completions: Mutex::new(VecDeque::new()),
188                progress_sender,
189                watchdog_started: AtomicBool::new(false),
190                shutdown: AtomicBool::new(false),
191                long_running_reminder_enabled: AtomicBool::new(true),
192                long_running_reminder_interval_ms: AtomicU64::new(600_000),
193                persisted_gc_started: AtomicBool::new(false),
194                #[cfg(test)]
195                persisted_gc_runs: AtomicU64::new(0),
196                compressor: Mutex::new(None),
197                db_pool: RwLock::new(None),
198                db_harness: RwLock::new(None),
199                wake_tx,
200                wake_rx,
201                watch_registry: Mutex::new(WatchRegistry::default()),
202            }),
203        }
204    }
205
206    pub fn set_harness(&self, harness: Harness) {
207        if let Ok(mut slot) = self.inner.db_harness.write() {
208            *slot = Some(harness.as_str().to_string());
209        }
210    }
211
212    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213        if let Ok(mut slot) = self.inner.db_pool.write() {
214            *slot = Some(conn);
215        }
216    }
217
218    pub fn clear_db_pool(&self) {
219        if let Ok(mut slot) = self.inner.db_pool.write() {
220            *slot = None;
221        }
222    }
223
224    /// Install the output-compression callback. Called by `main.rs` after
225    /// `AppContext` is constructed so that snapshot/completion paths can
226    /// invoke `compress::compress_with_registry` without holding a context
227    /// reference. When called multiple times, the latest installation wins.
228    pub fn set_compressor<F>(&self, compressor: F)
229    where
230        F: Fn(&str, String) -> String + Send + Sync + 'static,
231    {
232        if let Ok(mut slot) = self.inner.compressor.lock() {
233            *slot = Some(Box::new(compressor));
234        }
235    }
236
237    /// Apply the installed compressor (if any) to `output`. Returns `output`
238    /// untouched when no compressor is installed.
239    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240        let Ok(slot) = self.inner.compressor.lock() else {
241            return output;
242        };
243        match slot.as_ref() {
244            Some(compressor) => compressor(command, output),
245            None => output,
246        }
247    }
248
249    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250        write_task(&paths.json, metadata)?;
251        self.dual_write_task(paths, metadata);
252        Ok(())
253    }
254
255    fn update_task_metadata<F>(
256        &self,
257        paths: &TaskPaths,
258        update: F,
259    ) -> std::io::Result<PersistedTask>
260    where
261        F: FnOnce(&mut PersistedTask),
262    {
263        let metadata = update_task(&paths.json, update)?;
264        self.dual_write_task(paths, &metadata);
265        Ok(metadata)
266    }
267
268    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270        let Some(pool) = pool else {
271            return;
272        };
273        let harness = self
274            .inner
275            .db_harness
276            .read()
277            .ok()
278            .and_then(|slot| slot.clone());
279        let Some(harness) = harness else {
280            crate::slog_warn!(
281                "dual-write bash_task to DB skipped for {}: harness not configured",
282                metadata.task_id
283            );
284            return;
285        };
286        let row = match metadata.to_bash_task_row(&harness, paths) {
287            Ok(row) => row,
288            Err(error) => {
289                crate::slog_warn!(
290                    "dual-write bash_task to DB failed for {}: {}",
291                    metadata.task_id,
292                    error
293                );
294                return;
295            }
296        };
297        let conn = match pool.lock() {
298            Ok(conn) => conn,
299            Err(_) => {
300                crate::slog_warn!(
301                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
302                    metadata.task_id
303                );
304                return;
305            }
306        };
307        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308            crate::slog_warn!(
309                "dual-write bash_task to DB failed for {}: {}",
310                metadata.task_id,
311                error
312            );
313        }
314    }
315
316    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317        self.inner
318            .long_running_reminder_enabled
319            .store(enabled, Ordering::SeqCst);
320        self.inner
321            .long_running_reminder_interval_ms
322            .store(interval_ms, Ordering::SeqCst);
323    }
324
325    #[cfg(unix)]
326    #[allow(clippy::too_many_arguments)]
327    pub fn spawn(
328        &self,
329        command: &str,
330        session_id: String,
331        workdir: PathBuf,
332        env: HashMap<String, String>,
333        timeout: Option<Duration>,
334        storage_dir: PathBuf,
335        max_running: usize,
336        notify_on_completion: bool,
337        compressed: bool,
338        project_root: Option<PathBuf>,
339    ) -> Result<String, String> {
340        self.start_watchdog();
341
342        let running = self.running_count();
343        if running >= max_running {
344            return Err(format!(
345                "background bash task limit exceeded: {running} running (max {max_running})"
346            ));
347        }
348
349        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351        let task_id = self.generate_unique_task_id()?;
352        let paths = task_paths(&storage_dir, &session_id, &task_id);
353        fs::create_dir_all(&paths.dir)
354            .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356        let mut metadata = PersistedTask::starting(
357            task_id.clone(),
358            session_id.clone(),
359            command.to_string(),
360            workdir.clone(),
361            project_root,
362            timeout_ms,
363            notify_on_completion,
364            compressed,
365        );
366        self.persist_task(&paths, &metadata)
367            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369        // Pre-create capture files so the watchdog/buffer can always
370        // open them for reading. The spawn helper opens its own handles
371        // per attempt because each `Command::spawn()` consumes them.
372        create_capture_file(&paths.stdout)
373            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374        create_capture_file(&paths.stderr)
375            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378            Ok(child) => child,
379            Err(error) => {
380                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381                let _ = delete_task_bundle(&paths);
382                return Err(error);
383            }
384        };
385
386        let child_pid = child.id();
387        metadata.mark_running(child_pid, child_pid as i32);
388        self.persist_task(&paths, &metadata)
389            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391        let task = Arc::new(BgTask {
392            task_id: task_id.clone(),
393            session_id,
394            paths: paths.clone(),
395            started: Instant::now(),
396            last_reminder_at: Mutex::new(None),
397            terminal_at: Mutex::new(None),
398            state: Mutex::new(BgTaskState {
399                metadata,
400                runtime: TaskRuntime::Piped(Some(child)),
401                detached: false,
402                child_exit_observed: false,
403                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404                pending_terminal_override: None,
405            }),
406        });
407
408        self.inner
409            .tasks
410            .lock()
411            .map_err(|_| "background task registry lock poisoned".to_string())?
412            .insert(task_id.clone(), task);
413
414        Ok(task_id)
415    }
416
417    #[allow(clippy::too_many_arguments)]
418    pub fn spawn_pty(
419        &self,
420        command: &str,
421        session_id: String,
422        workdir: PathBuf,
423        env: HashMap<String, String>,
424        timeout: Option<Duration>,
425        storage_dir: PathBuf,
426        max_running: usize,
427        notify_on_completion: bool,
428        compressed: bool,
429        project_root: Option<PathBuf>,
430        rows: u16,
431        cols: u16,
432    ) -> Result<String, String> {
433        self.start_watchdog();
434
435        let running = self.running_count();
436        if running >= max_running {
437            return Err(format!(
438                "background bash task limit exceeded: {running} running (max {max_running})"
439            ));
440        }
441
442        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444        let task_id = self.generate_unique_task_id()?;
445        let paths = task_paths(&storage_dir, &session_id, &task_id);
446        fs::create_dir_all(&paths.dir)
447            .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449        let mut metadata = PersistedTask::starting(
450            task_id.clone(),
451            session_id.clone(),
452            command.to_string(),
453            workdir.clone(),
454            project_root,
455            timeout_ms,
456            notify_on_completion,
457            compressed,
458        );
459        metadata.mode = BgMode::Pty;
460        metadata.pty_rows = Some(rows);
461        metadata.pty_cols = Some(cols);
462        self.persist_task(&paths, &metadata)
463            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464        create_capture_file(&paths.pty)
465            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467        let runtime = match spawn_pty_for_command(
468            &task_id,
469            &session_id,
470            command,
471            &paths,
472            &workdir,
473            &env,
474            rows,
475            cols,
476            self.inner.wake_tx.clone(),
477        ) {
478            Ok(runtime) => runtime,
479            Err(error) => {
480                crate::slog_warn!(
481                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482                );
483                let _ = delete_task_bundle(&paths);
484                return Err(error);
485            }
486        };
487
488        if let Some(child_pid) = runtime.child_pid {
489            metadata.mark_running(child_pid, child_pid as i32);
490        } else {
491            metadata.status = BgTaskStatus::Running;
492            metadata.pgid = None;
493        }
494        self.persist_task(&paths, &metadata)
495            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497        let task = Arc::new(BgTask {
498            task_id: task_id.clone(),
499            session_id,
500            paths: paths.clone(),
501            started: Instant::now(),
502            last_reminder_at: Mutex::new(None),
503            terminal_at: Mutex::new(None),
504            state: Mutex::new(BgTaskState {
505                metadata,
506                runtime: TaskRuntime::Pty(Some(runtime)),
507                detached: false,
508                child_exit_observed: false,
509                buffer: BgBuffer::pty(paths.pty.clone()),
510                pending_terminal_override: None,
511            }),
512        });
513
514        self.inner
515            .tasks
516            .lock()
517            .map_err(|_| "background task registry lock poisoned".to_string())?
518            .insert(task_id.clone(), task);
519
520        Ok(task_id)
521    }
522
523    #[cfg(windows)]
524    #[allow(clippy::too_many_arguments)]
525    pub fn spawn(
526        &self,
527        command: &str,
528        session_id: String,
529        workdir: PathBuf,
530        env: HashMap<String, String>,
531        timeout: Option<Duration>,
532        storage_dir: PathBuf,
533        max_running: usize,
534        notify_on_completion: bool,
535        compressed: bool,
536        project_root: Option<PathBuf>,
537    ) -> Result<String, String> {
538        self.start_watchdog();
539
540        let running = self.running_count();
541        if running >= max_running {
542            return Err(format!(
543                "background bash task limit exceeded: {running} running (max {max_running})"
544            ));
545        }
546
547        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549        let task_id = self.generate_unique_task_id()?;
550        let paths = task_paths(&storage_dir, &session_id, &task_id);
551        fs::create_dir_all(&paths.dir)
552            .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554        let mut metadata = PersistedTask::starting(
555            task_id.clone(),
556            session_id.clone(),
557            command.to_string(),
558            workdir.clone(),
559            project_root,
560            timeout_ms,
561            notify_on_completion,
562            compressed,
563        );
564        self.persist_task(&paths, &metadata)
565            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567        // Capture files are pre-created so the watchdog/buffer can always
568        // open them for reading even if the child hasn't written anything
569        // yet. The spawn helper opens its own handles per attempt because
570        // each `Command::spawn()` consumes them, and on Windows we may
571        // retry across multiple shell candidates if the first one fails.
572        create_capture_file(&paths.stdout)
573            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574        create_capture_file(&paths.stderr)
575            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578            Ok(child) => child,
579            Err(error) => {
580                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581                let _ = delete_task_bundle(&paths);
582                return Err(error);
583            }
584        };
585
586        let child_pid = child.id();
587        metadata.status = BgTaskStatus::Running;
588        metadata.child_pid = Some(child_pid);
589        metadata.pgid = None;
590        self.persist_task(&paths, &metadata)
591            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593        let task = Arc::new(BgTask {
594            task_id: task_id.clone(),
595            session_id,
596            paths: paths.clone(),
597            started: Instant::now(),
598            last_reminder_at: Mutex::new(None),
599            terminal_at: Mutex::new(None),
600            state: Mutex::new(BgTaskState {
601                metadata,
602                runtime: TaskRuntime::Piped(Some(child)),
603                detached: false,
604                child_exit_observed: false,
605                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606                pending_terminal_override: None,
607            }),
608        });
609
610        self.inner
611            .tasks
612            .lock()
613            .map_err(|_| "background task registry lock poisoned".to_string())?
614            .insert(task_id.clone(), task);
615
616        Ok(task_id)
617    }
618
619    pub fn write_pty(
620        &self,
621        task_id: &str,
622        session_id: &str,
623        input: &[u8],
624    ) -> Result<usize, String> {
625        let task = self
626            .task_for_session(task_id, session_id)
627            .ok_or_else(|| "task_not_found".to_string())?;
628
629        let writer = {
630            let state = task
631                .state
632                .lock()
633                .map_err(|_| "background task lock poisoned".to_string())?;
634            if state.metadata.mode != BgMode::Pty {
635                return Err("task_not_pty".to_string());
636            }
637            if state.metadata.status.is_terminal() {
638                return Err("task_exited".to_string());
639            }
640            match &state.runtime {
641                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644            }
645        };
646
647        let mut writer = writer
648            .lock()
649            .map_err(|_| "PTY writer lock poisoned".to_string())?;
650        writer
651            .write_all(input)
652            .map_err(|error| format!("failed to write to PTY: {error}"))?;
653        writer
654            .flush()
655            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656        Ok(input.len())
657    }
658
659    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660        self.replay_session_inner(storage_dir, session_id, None)
661    }
662
663    pub fn replay_session_for_project(
664        &self,
665        storage_dir: &Path,
666        session_id: &str,
667        project_root: &Path,
668    ) -> Result<(), String> {
669        self.replay_session_inner(storage_dir, session_id, Some(project_root))
670    }
671
672    fn replay_session_inner(
673        &self,
674        storage_dir: &Path,
675        session_id: &str,
676        project_root: Option<&Path>,
677    ) -> Result<(), String> {
678        self.start_watchdog();
679        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682            }
683        }
684
685        let canonical_project = project_root.map(canonicalized_path);
686        // Replay strategy: DB is the post-v0.27 source of truth. Disk
687        // fallback handles pre-v0.27 tasks that haven't been migrated and
688        // the cold-start `__default__` namespace (configure runs before any
689        // user session exists, so plugin-init triggers a session-less DB
690        // lookup that will be empty until a real session writes a task).
691        //
692        // We deliberately keep the empty-DB / empty-disk path silent — it's
693        // the normal startup case and would otherwise fire on every configure
694        // (see GitHub user report against v0.27.0). INFO-level logs only when
695        // disk actually returned tasks (real migration signal); WARN when the
696        // DB lookup itself errored.
697        let tasks = match self.replay_session_from_db(session_id) {
698            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699            Some(Ok(_)) => {
700                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701                if !disk_tasks.is_empty() {
702                    crate::slog_info!(
703                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
704                        session_id,
705                        disk_tasks.len()
706                    );
707                }
708                disk_tasks
709            }
710            Some(Err(error)) => {
711                crate::slog_warn!(
712                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713                    session_id,
714                    error
715                );
716                self.replay_session_from_disk(storage_dir, session_id)?
717            }
718            None => {
719                // DB pool unconfigured — common in tests + before harness is set.
720                self.replay_session_from_disk(storage_dir, session_id)?
721            }
722        };
723
724        for mut metadata in tasks {
725            if metadata.session_id != session_id {
726                continue;
727            }
728            if let Some(canonical_project) = canonical_project.as_deref() {
729                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730                if metadata_project.as_deref() != Some(canonical_project) {
731                    continue;
732                }
733            }
734
735            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736            match metadata.status {
737                BgTaskStatus::Starting => {
738                    let completion_was_delivered = metadata.completion_delivered;
739                    metadata.mark_terminal(
740                        BgTaskStatus::Failed,
741                        None,
742                        Some("spawn aborted".to_string()),
743                    );
744                    metadata.completion_delivered |= completion_was_delivered;
745                    let _ = self.persist_task(&paths, &metadata);
746                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747                    self.insert_rehydrated_task(metadata, paths, true)?;
748                }
749                BgTaskStatus::Running | BgTaskStatus::Killing => {
750                    if metadata.mode == BgMode::Pty {
751                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752                            let completion_was_delivered = metadata.completion_delivered;
753                            metadata = terminal_metadata_from_marker(metadata, marker, None);
754                            metadata.completion_delivered |= completion_was_delivered;
755                            let _ = self.persist_task(&paths, &metadata);
756                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757                            self.insert_rehydrated_task(metadata, paths, true)?;
758                        } else if metadata.status.is_terminal() {
759                            self.insert_rehydrated_task(metadata, paths, true)?;
760                        } else {
761                            let completion_was_delivered = metadata.completion_delivered;
762                            metadata.mark_terminal(
763                                BgTaskStatus::Killed,
764                                None,
765                                Some("pty_lost_on_bridge_restart".to_string()),
766                            );
767                            metadata.completion_delivered |= completion_was_delivered;
768                            let _ = self.persist_task(&paths, &metadata);
769                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770                            self.insert_rehydrated_task(metadata, paths, true)?;
771                        }
772                    } else if self.running_metadata_is_stale(&metadata) {
773                        let completion_was_delivered = metadata.completion_delivered;
774                        metadata.mark_terminal(
775                            BgTaskStatus::Killed,
776                            None,
777                            Some("orphaned (>24h)".to_string()),
778                        );
779                        metadata.completion_delivered |= completion_was_delivered;
780                        if !paths.exit.exists() {
781                            let _ = write_kill_marker_if_absent(&paths.exit);
782                        }
783                        let _ = self.persist_task(&paths, &metadata);
784                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785                        self.insert_rehydrated_task(metadata, paths, true)?;
786                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788                            "recovered from inconsistent killing state on replay".to_string()
789                        });
790                        if reason.is_some() {
791                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792                            metadata.task_id);
793                        }
794                        let completion_was_delivered = metadata.completion_delivered;
795                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
796                        metadata.completion_delivered |= completion_was_delivered;
797                        let _ = self.persist_task(&paths, &metadata);
798                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799                        self.insert_rehydrated_task(metadata, paths, true)?;
800                    } else if metadata.status == BgTaskStatus::Killing {
801                        if !paths.exit.exists() {
802                            let _ = write_kill_marker_if_absent(&paths.exit);
803                        }
804                        let completion_was_delivered = metadata.completion_delivered;
805                        metadata.mark_terminal(
806                            BgTaskStatus::Killed,
807                            None,
808                            Some("recovered from inconsistent killing state on replay".to_string()),
809                        );
810                        metadata.completion_delivered |= completion_was_delivered;
811                        let _ = self.persist_task(&paths, &metadata);
812                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813                        self.insert_rehydrated_task(metadata, paths, true)?;
814                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815                        let completion_was_delivered = metadata.completion_delivered;
816                        metadata.mark_terminal(
817                            BgTaskStatus::Failed,
818                            None,
819                            Some("process exited without exit marker".to_string()),
820                        );
821                        metadata.completion_delivered |= completion_was_delivered;
822                        let _ = self.persist_task(&paths, &metadata);
823                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824                        self.insert_rehydrated_task(metadata, paths, true)?;
825                    } else {
826                        self.insert_rehydrated_task(metadata, paths, true)?;
827                    }
828                }
829                _ if metadata.status.is_terminal() => {
830                    // Borrow `paths` for the completion enqueue BEFORE
831                    // `insert_rehydrated_task` consumes it. The completion
832                    // helper only reads from `paths` (stdout/stderr/exit) to
833                    // reconstruct a tail preview, so it must see the same
834                    // paths the rehydrated task will own.
835                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836                    self.insert_rehydrated_task(metadata, paths, true)?;
837                }
838                _ => {}
839            }
840        }
841
842        Ok(())
843    }
844
845    fn replay_session_from_db(
846        &self,
847        session_id: &str,
848    ) -> Option<Result<Vec<PersistedTask>, String>> {
849        let pool = self
850            .inner
851            .db_pool
852            .read()
853            .ok()
854            .and_then(|slot| slot.clone())?;
855        let harness = self
856            .inner
857            .db_harness
858            .read()
859            .ok()
860            .and_then(|slot| slot.clone())?;
861        let conn = match pool.lock() {
862            Ok(conn) => conn,
863            Err(_) => return Some(Err("db mutex poisoned".to_string())),
864        };
865        Some(
866            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868                .map_err(|error| error.to_string()),
869        )
870    }
871
872    fn replay_session_from_disk(
873        &self,
874        storage_dir: &Path,
875        session_id: &str,
876    ) -> Result<Vec<PersistedTask>, String> {
877        let dir = session_tasks_dir(storage_dir, session_id);
878        if !dir.exists() {
879            return Ok(Vec::new());
880        }
881
882        let entries = fs::read_dir(&dir)
883            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884        let mut tasks = Vec::new();
885        for entry in entries.flatten() {
886            let path = entry.path();
887            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888                continue;
889            }
890            match read_task(&path) {
891                Ok(metadata) => tasks.push(metadata),
892                Err(error) => {
893                    crate::slog_warn!(
894                        "quarantining invalid background task metadata {} during replay: {error}",
895                        path.display()
896                    );
897                    if let Err(quarantine_error) =
898                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899                    {
900                        crate::slog_warn!(
901                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902                            path.display()
903                        );
904                    }
905                }
906            }
907        }
908        Ok(tasks)
909    }
910
911    pub fn register_watch(
912        &self,
913        task_id: String,
914        pattern: WatchPattern,
915        once: bool,
916    ) -> Result<String, &'static str> {
917        let task = self.task(&task_id);
918        let task_paths = task.as_ref().and_then(|task| {
919            task.state.lock().ok().map(|state| {
920                (
921                    state.metadata.mode.clone(),
922                    state.metadata.status.is_terminal(),
923                    task.paths.stdout.clone(),
924                    task.paths.stderr.clone(),
925                    task.paths.pty.clone(),
926                )
927            })
928        });
929
930        let mut terminal_matches = Vec::new();
931        let watch_id = {
932            let mut registry = self
933                .inner
934                .watch_registry
935                .lock()
936                .map_err(|_| "watch_registry_poisoned")?;
937            let watch_id = registry.register(task_id.clone(), pattern, once)?;
938            if let Some((mode, terminal, stdout, stderr, pty)) = task_paths {
939                match mode {
940                    BgMode::Pipes => {
941                        let stdout_key = format!("{task_id}:stdout");
942                        let stderr_key = format!("{task_id}:stderr");
943                        if terminal {
944                            registry.set_file_cursor(&stdout_key, 0);
945                            registry.set_file_cursor(&stderr_key, 0);
946                            terminal_matches.extend(registry.scan_file_new_bytes(
947                                &stdout_key,
948                                &task_id,
949                                &stdout,
950                            ));
951                            terminal_matches.extend(registry.scan_file_new_bytes(
952                                &stderr_key,
953                                &task_id,
954                                &stderr,
955                            ));
956                        } else {
957                            registry.prime_file_cursor(&stdout_key, &stdout);
958                            registry.prime_file_cursor(&stderr_key, &stderr);
959                        }
960                    }
961                    BgMode::Pty => {
962                        let pty_key = format!("{task_id}:pty");
963                        if terminal {
964                            registry.set_file_cursor(&pty_key, 0);
965                            terminal_matches
966                                .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
967                        } else {
968                            registry.prime_file_cursor(&pty_key, &pty);
969                        }
970                    }
971                }
972            }
973            watch_id
974        };
975
976        if let Some(task) = task.as_ref() {
977            if task.is_terminal() {
978                let completion = self.remove_pending_completion(&task_id).or_else(|| {
979                    self.completion_snapshot_for_task(task, BG_COMPLETION_PREVIEW_BYTES)
980                });
981                if terminal_matches.is_empty() {
982                    if let Some(completion) = completion.as_ref() {
983                        self.emit_bash_watch_exit(completion);
984                    }
985                } else {
986                    for pattern_match in terminal_matches {
987                        self.emit_bash_pattern_match(&task.session_id, pattern_match);
988                    }
989                }
990                let _ = task.set_completion_delivered(true, self);
991                self.clear_task_watch_state(&task_id);
992            }
993        }
994
995        Ok(watch_id)
996    }
997
998    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
999        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1000            registry.unregister(task_id, watch_id);
1001        }
1002    }
1003
1004    pub fn active_watch_count(&self, task_id: &str) -> usize {
1005        self.inner
1006            .watch_registry
1007            .lock()
1008            .map(|registry| registry.active_count(task_id))
1009            .unwrap_or(0)
1010    }
1011
1012    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1013        self.inner
1014            .watch_registry
1015            .lock()
1016            .map(|registry| {
1017                (
1018                    registry.has_controlled_task(task_id),
1019                    registry.has_matched_task(task_id),
1020                )
1021            })
1022            .unwrap_or((false, false))
1023    }
1024
1025    fn task_has_watch_control(&self, task_id: &str) -> bool {
1026        self.inner
1027            .watch_registry
1028            .lock()
1029            .map(|registry| registry.has_controlled_task(task_id))
1030            .unwrap_or(false)
1031    }
1032
1033    fn clear_task_watch_state(&self, task_id: &str) {
1034        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1035            registry.clear_task(task_id);
1036        }
1037    }
1038
1039    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1040        let (mode, stdout, stderr, pty) = match task.state.lock() {
1041            Ok(state) => (
1042                state.metadata.mode.clone(),
1043                task.paths.stdout.clone(),
1044                task.paths.stderr.clone(),
1045                task.paths.pty.clone(),
1046            ),
1047            Err(_) => return,
1048        };
1049        let mut matches = Vec::new();
1050        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1051            match mode {
1052                BgMode::Pipes => {
1053                    let stdout_key = format!("{}:stdout", task.task_id);
1054                    let stderr_key = format!("{}:stderr", task.task_id);
1055                    matches.extend(registry.scan_file_new_bytes(
1056                        &stdout_key,
1057                        &task.task_id,
1058                        &stdout,
1059                    ));
1060                    matches.extend(registry.scan_file_new_bytes(
1061                        &stderr_key,
1062                        &task.task_id,
1063                        &stderr,
1064                    ));
1065                }
1066                BgMode::Pty => {
1067                    let pty_key = format!("{}:pty", task.task_id);
1068                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1069                }
1070            }
1071        }
1072        for pattern_match in matches {
1073            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1074        }
1075    }
1076
1077    pub fn status(
1078        &self,
1079        task_id: &str,
1080        session_id: &str,
1081        project_root: Option<&Path>,
1082        storage_dir: Option<&Path>,
1083        preview_bytes: usize,
1084    ) -> Option<BgTaskSnapshot> {
1085        let mut task = self.task_for_session(task_id, session_id);
1086        if task.is_none() {
1087            if let Some(storage_dir) = storage_dir {
1088                let _ = self.replay_session(storage_dir, session_id);
1089                task = self.task_for_session(task_id, session_id);
1090            }
1091        }
1092        let Some(task) = task else {
1093            return self.status_relaxed(
1094                task_id,
1095                session_id,
1096                project_root?,
1097                storage_dir?,
1098                preview_bytes,
1099            );
1100        };
1101        let _ = self.poll_task(&task);
1102        let mut snapshot = task.snapshot(preview_bytes);
1103        self.maybe_compress_snapshot(&task, &mut snapshot);
1104        Some(snapshot)
1105    }
1106
1107    fn status_relaxed_task(
1108        &self,
1109        task_id: &str,
1110        project_root: &Path,
1111        storage_dir: &Path,
1112    ) -> Option<Arc<BgTask>> {
1113        let canonical_project = canonicalized_path(project_root);
1114        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1115            Some(Ok(Some(metadata))) => {
1116                if let Some(task) = self.task(task_id) {
1117                    let matches_project = task
1118                        .state
1119                        .lock()
1120                        .map(|state| {
1121                            state
1122                                .metadata
1123                                .project_root
1124                                .as_deref()
1125                                .map(canonicalized_path)
1126                                .as_deref()
1127                                == Some(canonical_project.as_path())
1128                        })
1129                        .unwrap_or(false);
1130                    return matches_project.then_some(task);
1131                }
1132                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1133                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1134                    return None;
1135                }
1136                return self.task(task_id);
1137            }
1138            Some(Ok(None)) => {
1139                crate::slog_info!(
1140                    "bash task relaxed DB miss for {}; falling back to disk",
1141                    task_id
1142                );
1143            }
1144            Some(Err(error)) => {
1145                crate::slog_warn!(
1146                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1147                    task_id,
1148                    error
1149                );
1150            }
1151            None => {
1152                crate::slog_info!(
1153                    "bash task relaxed DB unavailable for {}; falling back to disk",
1154                    task_id
1155                );
1156            }
1157        }
1158        let root = storage_dir.join("bash-tasks");
1159        let entries = fs::read_dir(&root).ok()?;
1160        for entry in entries.flatten() {
1161            let dir = entry.path();
1162            if !dir.is_dir() {
1163                continue;
1164            }
1165            let path = dir.join(format!("{task_id}.json"));
1166            if !path.exists() {
1167                continue;
1168            }
1169            let metadata = match read_task(&path) {
1170                Ok(metadata) => metadata,
1171                Err(error) => {
1172                    crate::slog_warn!(
1173                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1174                        path.display()
1175                    );
1176                    if let Err(quarantine_error) =
1177                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1178                    {
1179                        crate::slog_warn!(
1180                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1181                            path.display()
1182                        );
1183                    }
1184                    continue;
1185                }
1186            };
1187            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1188            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1189                continue;
1190            }
1191            if let Some(task) = self.task(task_id) {
1192                let matches_project = task
1193                    .state
1194                    .lock()
1195                    .map(|state| {
1196                        state
1197                            .metadata
1198                            .project_root
1199                            .as_deref()
1200                            .map(canonicalized_path)
1201                            .as_deref()
1202                            == Some(canonical_project.as_path())
1203                    })
1204                    .unwrap_or(false);
1205                return matches_project.then_some(task);
1206            }
1207            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1208            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1209                return None;
1210            }
1211            return self.task(task_id);
1212        }
1213        None
1214    }
1215
1216    fn lookup_relaxed_task_from_db(
1217        &self,
1218        task_id: &str,
1219        project_root: &Path,
1220    ) -> Option<Result<Option<PersistedTask>, String>> {
1221        let pool = self
1222            .inner
1223            .db_pool
1224            .read()
1225            .ok()
1226            .and_then(|slot| slot.clone())?;
1227        let harness = self
1228            .inner
1229            .db_harness
1230            .read()
1231            .ok()
1232            .and_then(|slot| slot.clone())?;
1233        let conn = match pool.lock() {
1234            Ok(conn) => conn,
1235            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1236        };
1237        let project_key = crate::search_index::project_cache_key(project_root);
1238        Some(
1239            crate::db::bash_tasks::find_bash_task_for_project(
1240                &conn,
1241                &harness,
1242                &project_key,
1243                task_id,
1244            )
1245            .map(|row| row.map(PersistedTask::from))
1246            .map_err(|error| error.to_string()),
1247        )
1248    }
1249
1250    pub(super) fn status_relaxed(
1251        &self,
1252        task_id: &str,
1253        _session_id: &str,
1254        project_root: &Path,
1255        storage_dir: &Path,
1256        preview_bytes: usize,
1257    ) -> Option<BgTaskSnapshot> {
1258        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1259        let _ = self.poll_task(&task);
1260        let mut snapshot = task.snapshot(preview_bytes);
1261        self.maybe_compress_snapshot(&task, &mut snapshot);
1262        Some(snapshot)
1263    }
1264
1265    pub fn kill_relaxed(
1266        &self,
1267        task_id: &str,
1268        project_root: &Path,
1269        storage_dir: &Path,
1270    ) -> Result<BgTaskSnapshot, String> {
1271        let task = self
1272            .status_relaxed_task(task_id, project_root, storage_dir)
1273            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1274        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1275    }
1276
1277    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1278        #[cfg(test)]
1279        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1280
1281        let mut deleted = 0usize;
1282
1283        let root = storage_dir.join("bash-tasks");
1284        if root.exists() {
1285            let session_dirs = fs::read_dir(&root).map_err(|e| {
1286                format!(
1287                    "failed to read background task root {}: {e}",
1288                    root.display()
1289                )
1290            })?;
1291            for session_entry in session_dirs.flatten() {
1292                let session_dir = session_entry.path();
1293                if !session_dir.is_dir() {
1294                    continue;
1295                }
1296                let task_entries = match fs::read_dir(&session_dir) {
1297                    Ok(entries) => entries,
1298                    Err(error) => {
1299                        crate::slog_warn!(
1300                            "failed to read background task session dir {}: {error}",
1301                            session_dir.display()
1302                        );
1303                        continue;
1304                    }
1305                };
1306                for task_entry in task_entries.flatten() {
1307                    let json_path = task_entry.path();
1308                    if json_path
1309                        .extension()
1310                        .and_then(|extension| extension.to_str())
1311                        != Some("json")
1312                    {
1313                        continue;
1314                    }
1315                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1316                        continue;
1317                    }
1318                    let metadata = match read_task(&json_path) {
1319                        Ok(metadata) => metadata,
1320                        Err(error) => {
1321                            crate::slog_warn!(
1322                                "quarantining corrupt background task metadata {}: {error}",
1323                                json_path.display()
1324                            );
1325                            quarantine_task_json(
1326                                storage_dir,
1327                                &session_dir,
1328                                &json_path,
1329                                QuarantineKind::Corrupt,
1330                            )?;
1331                            continue;
1332                        }
1333                    };
1334                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1335                        continue;
1336                    }
1337                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1338                    match delete_task_bundle(&paths) {
1339                        Ok(()) => {
1340                            deleted += 1;
1341                            log::debug!(
1342                                "deleted persisted background task bundle {}",
1343                                metadata.task_id
1344                            );
1345                        }
1346                        Err(error) => {
1347                            crate::slog_warn!(
1348                                "failed to delete background task bundle {}: {error}",
1349                                metadata.task_id
1350                            );
1351                            continue;
1352                        }
1353                    }
1354                }
1355            }
1356        }
1357        gc_quarantine(storage_dir);
1358        Ok(deleted)
1359    }
1360
1361    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1362        let tasks = self
1363            .inner
1364            .tasks
1365            .lock()
1366            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1367            .unwrap_or_default();
1368        tasks
1369            .into_iter()
1370            .map(|task| {
1371                let _ = self.poll_task(&task);
1372                let mut snapshot = task.snapshot(preview_bytes);
1373                self.maybe_compress_snapshot(&task, &mut snapshot);
1374                snapshot
1375            })
1376            .collect()
1377    }
1378
1379    /// Compress `output_preview` in place when the task is in a terminal
1380    /// state. Live tail of running tasks stays raw so agents debugging
1381    /// long-running bash see exactly what the process emitted, not a
1382    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
1383    /// field on `PersistedTask` short-circuits before the compress pipeline.
1384    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1385        if !snapshot.info.status.is_terminal() {
1386            return;
1387        }
1388        let (compressed_flag, mode) = task
1389            .state
1390            .lock()
1391            .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1392            .unwrap_or((true, BgMode::Pipes));
1393        if mode == BgMode::Pty {
1394            return;
1395        }
1396        if !compressed_flag {
1397            return;
1398        }
1399        let raw = std::mem::take(&mut snapshot.output_preview);
1400        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1401    }
1402
1403    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1404        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1405    }
1406
1407    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1408        let task = self
1409            .task_for_session(task_id, session_id)
1410            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1411        let mut state = task
1412            .state
1413            .lock()
1414            .map_err(|_| "background task lock poisoned".to_string())?;
1415        let updated = self
1416            .update_task_metadata(&task.paths, |metadata| {
1417                metadata.notify_on_completion = true;
1418                metadata.completion_delivered = false;
1419            })
1420            .map_err(|e| format!("failed to promote background task: {e}"))?;
1421        state.metadata = updated;
1422        if state.metadata.status.is_terminal() {
1423            state.buffer.enforce_terminal_cap();
1424            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1425        }
1426        Ok(true)
1427    }
1428
1429    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1430        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1431            .map(|_| ())
1432    }
1433
1434    pub fn cleanup_finished(&self, older_than: Duration) {
1435        let cutoff = Instant::now().checked_sub(older_than);
1436        let removable_paths: Vec<(String, TaskPaths)> =
1437            if let Ok(mut tasks) = self.inner.tasks.lock() {
1438                let removable = tasks
1439                    .iter()
1440                    .filter_map(|(task_id, task)| {
1441                        let delivered_terminal = task
1442                            .state
1443                            .lock()
1444                            .map(|state| {
1445                                state.metadata.status.is_terminal()
1446                                    && state.metadata.completion_delivered
1447                            })
1448                            .unwrap_or(false);
1449                        if !delivered_terminal {
1450                            return None;
1451                        }
1452
1453                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1454                        let expired = match (terminal_at, cutoff) {
1455                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1456                            (Some(_), None) => true,
1457                            (None, _) => false,
1458                        };
1459                        expired.then(|| task_id.clone())
1460                    })
1461                    .collect::<Vec<_>>();
1462
1463                removable
1464                    .into_iter()
1465                    .filter_map(|task_id| {
1466                        tasks
1467                            .remove(&task_id)
1468                            .map(|task| (task_id, task.paths.clone()))
1469                    })
1470                    .collect()
1471            } else {
1472                Vec::new()
1473            };
1474
1475        for (task_id, paths) in removable_paths {
1476            match delete_task_bundle(&paths) {
1477                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1478                Err(error) => crate::slog_warn!(
1479                    "failed to delete persisted background task bundle {task_id}: {error}"
1480                ),
1481            }
1482        }
1483    }
1484
1485    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1486        self.drain_completions_for_session(None)
1487    }
1488
1489    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1490        let completions = match self.inner.completions.lock() {
1491            Ok(completions) => completions,
1492            Err(_) => return Vec::new(),
1493        };
1494
1495        completions
1496            .iter()
1497            .filter(|completion| {
1498                session_id
1499                    .map(|session_id| completion.session_id == session_id)
1500                    .unwrap_or(true)
1501            })
1502            .cloned()
1503            .collect()
1504    }
1505
1506    pub fn ack_completions_for_session(
1507        &self,
1508        session_id: Option<&str>,
1509        task_ids: &[String],
1510    ) -> Vec<String> {
1511        if task_ids.is_empty() {
1512            return Vec::new();
1513        }
1514        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1515        let mut completion_sessions = HashMap::new();
1516        if let Ok(mut completions) = self.inner.completions.lock() {
1517            completions.retain(|completion| {
1518                let session_matches = session_id
1519                    .map(|session_id| completion.session_id == session_id)
1520                    .unwrap_or(true);
1521                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1522                    completion_sessions
1523                        .insert(completion.task_id.clone(), completion.session_id.clone());
1524                    false
1525                } else {
1526                    true
1527                }
1528            });
1529        }
1530
1531        let mut delivered = Vec::new();
1532        for task_id in task_ids {
1533            let task = if let Some(session_id) = session_id {
1534                self.task_for_session(task_id, session_id)
1535            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1536                self.task_for_session(task_id, completion_session_id)
1537            } else {
1538                self.task(task_id)
1539            };
1540            if let Some(task) = task {
1541                if task.set_completion_delivered(true, self).is_ok() {
1542                    delivered.push(task_id.clone());
1543                }
1544            }
1545        }
1546
1547        delivered
1548    }
1549
1550    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1551        self.inner
1552            .completions
1553            .lock()
1554            .map(|completions| {
1555                completions
1556                    .iter()
1557                    .filter(|completion| completion.session_id == session_id)
1558                    .cloned()
1559                    .collect()
1560            })
1561            .unwrap_or_default()
1562    }
1563
1564    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1565        let mut completions = self.inner.completions.lock().ok()?;
1566        let idx = completions
1567            .iter()
1568            .position(|completion| completion.task_id == task_id)?;
1569        completions.remove(idx)
1570    }
1571
1572    fn completion_snapshot_for_task(
1573        &self,
1574        task: &Arc<BgTask>,
1575        preview_bytes: usize,
1576    ) -> Option<BgCompletion> {
1577        let snapshot = task.snapshot(preview_bytes);
1578        if !snapshot.info.status.is_terminal() {
1579            return None;
1580        }
1581        let output_preview = if snapshot.info.mode == BgMode::Pty {
1582            String::new()
1583        } else {
1584            let compressed = task
1585                .state
1586                .lock()
1587                .map(|state| state.metadata.compressed)
1588                .unwrap_or(true);
1589            if compressed {
1590                self.compress_output(&snapshot.info.command, snapshot.output_preview)
1591            } else {
1592                snapshot.output_preview
1593            }
1594        };
1595        Some(BgCompletion {
1596            task_id: snapshot.info.task_id,
1597            session_id: task.session_id.clone(),
1598            status: snapshot.info.status,
1599            exit_code: snapshot.exit_code,
1600            command: snapshot.info.command,
1601            output_preview,
1602            output_truncated: snapshot.output_truncated,
1603            original_tokens: None,
1604            compressed_tokens: None,
1605            tokens_skipped: false,
1606        })
1607    }
1608
1609    pub fn detach(&self) {
1610        self.inner.shutdown.store(true, Ordering::SeqCst);
1611        if let Ok(mut tasks) = self.inner.tasks.lock() {
1612            for task in tasks.values() {
1613                if let Ok(mut state) = task.state.lock() {
1614                    match &mut state.runtime {
1615                        TaskRuntime::Piped(child) => *child = None,
1616                        TaskRuntime::Pty(runtime) => *runtime = None,
1617                    }
1618                    state.detached = true;
1619                }
1620            }
1621            tasks.clear();
1622        }
1623    }
1624
1625    pub fn shutdown(&self) {
1626        let tasks = self
1627            .inner
1628            .tasks
1629            .lock()
1630            .map(|tasks| {
1631                tasks
1632                    .values()
1633                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1634                    .collect::<Vec<_>>()
1635            })
1636            .unwrap_or_default();
1637        for (task_id, session_id) in tasks {
1638            let _ = self.kill(&task_id, &session_id);
1639        }
1640    }
1641
1642    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1643        if let Ok(state) = task.state.lock() {
1644            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1645                let complete = pty.reader_done.load(Ordering::SeqCst)
1646                    && pty.exit_observed.load(Ordering::SeqCst);
1647                if !complete {
1648                    return Ok(());
1649                }
1650            }
1651        }
1652        let marker = match read_exit_marker(&task.paths.exit) {
1653            Ok(Some(marker)) => marker,
1654            Ok(None) => return Ok(()),
1655            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1656        };
1657        self.finalize_from_marker(task, marker, None)
1658    }
1659
1660    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1661        let Ok(mut state) = task.state.lock() else {
1662            return;
1663        };
1664        match &mut state.runtime {
1665            TaskRuntime::Piped(child_slot) => {
1666                if let Some(child) = child_slot.as_mut() {
1667                    if matches!(child.try_wait(), Ok(Some(_))) {
1668                        *child_slot = None;
1669                        state.detached = true;
1670                        state.child_exit_observed = true;
1671                    }
1672                } else if state.detached {
1673                    let child_known_dead = state.child_exit_observed
1674                        || state
1675                            .metadata
1676                            .child_pid
1677                            .is_some_and(|pid| !is_process_alive(pid));
1678                    if child_known_dead {
1679                        self.fail_without_exit_marker_if_needed(task, &mut state);
1680                    }
1681                }
1682            }
1683            TaskRuntime::Pty(Some(pty)) => {
1684                let complete = pty.reader_done.load(Ordering::SeqCst)
1685                    && pty.exit_observed.load(Ordering::SeqCst);
1686                if complete {
1687                    drop(state);
1688                    let _ = self.poll_task(task);
1689                }
1690            }
1691            TaskRuntime::Pty(None) => {}
1692        }
1693    }
1694
1695    fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1696        if state.metadata.status.is_terminal() {
1697            return;
1698        }
1699        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1700            return;
1701        }
1702        let watch_controlled = self.task_has_watch_control(&task.task_id);
1703        let updated = self.update_task_metadata(&task.paths, |metadata| {
1704            metadata.mark_terminal(
1705                BgTaskStatus::Failed,
1706                None,
1707                Some("process exited without exit marker".to_string()),
1708            );
1709            if watch_controlled {
1710                metadata.completion_delivered = true;
1711            }
1712        });
1713        if let Ok(metadata) = updated {
1714            state.pending_terminal_override = None;
1715            state.metadata = metadata;
1716            task.mark_terminal_now();
1717            state.buffer.enforce_terminal_cap();
1718            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1719        }
1720    }
1721
1722    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1723        self.inner
1724            .tasks
1725            .lock()
1726            .map(|tasks| {
1727                tasks
1728                    .values()
1729                    .filter(|task| task.is_running())
1730                    .cloned()
1731                    .collect()
1732            })
1733            .unwrap_or_default()
1734    }
1735
1736    fn insert_rehydrated_task(
1737        &self,
1738        metadata: PersistedTask,
1739        paths: TaskPaths,
1740        detached: bool,
1741    ) -> Result<(), String> {
1742        let task_id = metadata.task_id.clone();
1743        let session_id = metadata.session_id.clone();
1744        let started = started_instant_from_unix_millis(metadata.started_at);
1745        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1746        let mode = metadata.mode.clone();
1747        let task = Arc::new(BgTask {
1748            task_id: task_id.clone(),
1749            session_id,
1750            paths: paths.clone(),
1751            started,
1752            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1753            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1754            state: Mutex::new(BgTaskState {
1755                metadata,
1756                runtime: if mode == BgMode::Pty {
1757                    TaskRuntime::Pty(None)
1758                } else {
1759                    TaskRuntime::Piped(None)
1760                },
1761                detached,
1762                // Replay path: we never observed the child handle's exit
1763                // in this process (the previous AFT process did, but its
1764                // observation didn't survive restart). Leave this false so
1765                // the second-pass reap falls through to the
1766                // `is_process_alive(child_pid)` probe rather than declaring
1767                // failure based on stale evidence.
1768                child_exit_observed: false,
1769                buffer: if mode == BgMode::Pty {
1770                    BgBuffer::pty(paths.pty.clone())
1771                } else {
1772                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1773                },
1774                pending_terminal_override: None,
1775            }),
1776        });
1777        self.inner
1778            .tasks
1779            .lock()
1780            .map_err(|_| "background task registry lock poisoned".to_string())?
1781            .insert(task_id, task);
1782        Ok(())
1783    }
1784
1785    fn kill_with_status(
1786        &self,
1787        task_id: &str,
1788        session_id: &str,
1789        terminal_status: BgTaskStatus,
1790    ) -> Result<BgTaskSnapshot, String> {
1791        let task = self
1792            .task_for_session(task_id, session_id)
1793            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1794
1795        {
1796            let mut state = task
1797                .state
1798                .lock()
1799                .map_err(|_| "background task lock poisoned".to_string())?;
1800            if state.metadata.status.is_terminal() {
1801                state.pending_terminal_override = None;
1802                return Ok(task.snapshot_locked(&state, 5 * 1024));
1803            }
1804
1805            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1806                state.metadata =
1807                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1808                if self.task_has_watch_control(&task.task_id) {
1809                    state.metadata.completion_delivered = true;
1810                }
1811                state.pending_terminal_override = None;
1812                task.mark_terminal_now();
1813                match &mut state.runtime {
1814                    TaskRuntime::Piped(child) => *child = None,
1815                    TaskRuntime::Pty(runtime) => *runtime = None,
1816                }
1817                state.detached = true;
1818                state.buffer.enforce_terminal_cap();
1819                self.persist_task(&task.paths, &state.metadata)
1820                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1821                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1822                return Ok(task.snapshot_locked(&state, 5 * 1024));
1823            }
1824
1825            let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1826            if !was_already_killing {
1827                state.metadata.status = BgTaskStatus::Killing;
1828                self.persist_task(&task.paths, &state.metadata)
1829                    .map_err(|e| format!("failed to persist killing state: {e}"))?;
1830            }
1831
1832            let pgid = state.metadata.pgid;
1833            #[cfg(windows)]
1834            let child_pid = state.metadata.child_pid;
1835            if !was_already_killing
1836                && state.metadata.mode == BgMode::Pty
1837                && terminal_status == BgTaskStatus::TimedOut
1838            {
1839                state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1840            }
1841
1842            match &mut state.runtime {
1843                TaskRuntime::Piped(child_slot) => {
1844                    #[cfg(unix)]
1845                    if let Some(pgid) = pgid {
1846                        terminate_pgid(pgid, child_slot.as_mut());
1847                    }
1848                    #[cfg(windows)]
1849                    if let Some(child) = child_slot.as_mut() {
1850                        super::process::terminate_process(child);
1851                    } else if let Some(pid) = child_pid {
1852                        terminate_pid(pid);
1853                    }
1854                    if let Some(child) = child_slot.as_mut() {
1855                        let _ = child.wait();
1856                    }
1857                    *child_slot = None;
1858                    state.detached = true;
1859
1860                    if !task.paths.exit.exists() {
1861                        write_kill_marker_if_absent(&task.paths.exit)
1862                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
1863                    }
1864
1865                    let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1866                        Some(124)
1867                    } else {
1868                        None
1869                    };
1870                    state
1871                        .metadata
1872                        .mark_terminal(terminal_status, exit_code, None);
1873                    if self.task_has_watch_control(&task.task_id) {
1874                        state.metadata.completion_delivered = true;
1875                    }
1876                    state.pending_terminal_override = None;
1877                    task.mark_terminal_now();
1878                    self.persist_task(&task.paths, &state.metadata)
1879                        .map_err(|e| format!("failed to persist killed state: {e}"))?;
1880                    state.buffer.enforce_terminal_cap();
1881                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1882                }
1883                TaskRuntime::Pty(Some(pty)) => {
1884                    pty.was_killed.store(true, Ordering::SeqCst);
1885                    if let Err(error) = pty.killer.kill() {
1886                        crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1887                    }
1888                    if let Some(pid) = pty.child_pid {
1889                        #[cfg(unix)]
1890                        terminate_pgid(pid as i32, None);
1891                        #[cfg(windows)]
1892                        terminate_pid(pid);
1893                    }
1894                    drop(pty.master.take());
1895                }
1896                TaskRuntime::Pty(None) => {}
1897            }
1898        }
1899
1900        Ok(task.snapshot(5 * 1024))
1901    }
1902
1903    fn finalize_from_marker(
1904        &self,
1905        task: &Arc<BgTask>,
1906        marker: ExitMarker,
1907        reason: Option<String>,
1908    ) -> Result<(), String> {
1909        let watch_controlled = self.task_has_watch_control(&task.task_id);
1910        {
1911            let mut state = task
1912                .state
1913                .lock()
1914                .map_err(|_| "background task lock poisoned".to_string())?;
1915            if state.metadata.status.is_terminal() {
1916                state.pending_terminal_override = None;
1917                return Ok(());
1918            }
1919
1920            let pending_override = state.pending_terminal_override.take();
1921            let is_pty = state.metadata.mode == BgMode::Pty;
1922            let updated = self
1923                .update_task_metadata(&task.paths, |metadata| {
1924                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
1925                        let mut metadata = metadata.clone();
1926                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
1927                        let exit_code = if target_status == BgTaskStatus::TimedOut {
1928                            Some(124)
1929                        } else {
1930                            None
1931                        };
1932                        metadata.mark_terminal(target_status, exit_code, reason);
1933                        metadata
1934                    } else {
1935                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
1936                    };
1937                    if watch_controlled {
1938                        new_metadata.completion_delivered = true;
1939                    }
1940                    *metadata = new_metadata;
1941                })
1942                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1943            state.metadata = updated;
1944            task.mark_terminal_now();
1945            match &mut state.runtime {
1946                TaskRuntime::Piped(child) => *child = None,
1947                TaskRuntime::Pty(runtime) => *runtime = None,
1948            }
1949            state.detached = true;
1950        }
1951
1952        // One final scan runs before terminal notification routing so bytes
1953        // printed immediately before exit can win over the exit safety net.
1954        self.scan_task_watch_output(task);
1955
1956        let mut state = task
1957            .state
1958            .lock()
1959            .map_err(|_| "background task lock poisoned".to_string())?;
1960        state.buffer.enforce_terminal_cap();
1961        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1962        Ok(())
1963    }
1964
1965    fn enqueue_completion_if_needed(
1966        &self,
1967        metadata: &PersistedTask,
1968        paths: Option<&TaskPaths>,
1969        emit_frame: bool,
1970    ) {
1971        if metadata.status.is_terminal() && !metadata.completion_delivered {
1972            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1973        }
1974    }
1975
1976    fn enqueue_completion_locked(
1977        &self,
1978        metadata: &PersistedTask,
1979        buffer: Option<&BgBuffer>,
1980        emit_frame: bool,
1981    ) {
1982        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1983    }
1984
1985    fn enqueue_completion_from_parts(
1986        &self,
1987        metadata: &PersistedTask,
1988        buffer: Option<&BgBuffer>,
1989        paths: Option<&TaskPaths>,
1990        emit_frame: bool,
1991    ) {
1992        // Only the terminal-state guard prevents double-recording here. The
1993        // `completion_delivered` flag is NOT used to gate compression-event
1994        // recording, because `mark_terminal` flips `completion_delivered=true`
1995        // immediately for tasks with `notify_on_completion=false` (foreground
1996        // bash polled via `bash_status`, which is the common case). Pre-emptive
1997        // delivery flagging is correct for the push-frame queue (suppresses
1998        // duplicate user-visible notifications) but would silently skip the
1999        // database insert below. Compression event recording is idempotent at
2000        // the DB layer (unique on harness+session+task_id), so re-entry is
2001        // safe; the dedupe-by-queue check stays for the push frame side.
2002        if !metadata.status.is_terminal() {
2003            return;
2004        }
2005        // Read tail once at completion time and cache on the BgCompletion so
2006        // both the push-frame consumer (running session) and any later
2007        // `bash_drain_completions` poll (different session, restart) see the
2008        // same preview without racing against rotation.
2009        let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2010            (String::new(), false)
2011        } else {
2012            match buffer {
2013                Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2014                None => paths
2015                    .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2016                    .unwrap_or_else(|| (String::new(), false)),
2017            }
2018        };
2019        // Compress at completion time so push-frame consumers and later
2020        // `bash_drain_completions` poll-callers see the same compressed text.
2021        // Per-task `compressed: false` opts out; otherwise the compressor is
2022        // a no-op when `experimental.bash.compress=false`.
2023        let output_preview = if metadata.compressed {
2024            self.compress_output(&metadata.command, raw_preview)
2025        } else {
2026            raw_preview
2027        };
2028        let token_counts = self.completion_token_counts(metadata, buffer, paths);
2029        let completion = BgCompletion {
2030            task_id: metadata.task_id.clone(),
2031            session_id: metadata.session_id.clone(),
2032            status: metadata.status.clone(),
2033            exit_code: metadata.exit_code,
2034            command: metadata.command.clone(),
2035            output_preview,
2036            output_truncated,
2037            original_tokens: token_counts.original_tokens,
2038            compressed_tokens: token_counts.compressed_tokens,
2039            tokens_skipped: token_counts.tokens_skipped,
2040        };
2041
2042        // Record the compression event BEFORE the push-frame dedupe. Event
2043        // recording has its own idempotency at the DB layer (unique key on
2044        // harness+session+task_id), so it's safe to attempt for every
2045        // terminal-state finalize. Critically, this path runs even when
2046        // `completion_delivered=true` was pre-set by `mark_terminal` for
2047        // foreground bash (`notify_on_completion=false`) — which is the common
2048        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2049        // after the dedupe guard and never fired for foreground tasks, which
2050        // meant compression accounting was effectively dead for >99% of
2051        // real-world bash usage.
2052        self.record_compression_event_if_applicable(metadata, &token_counts);
2053
2054        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2055        if watch_controlled {
2056            if emit_frame && !watch_matched {
2057                self.emit_bash_watch_exit(&completion);
2058            }
2059            self.clear_task_watch_state(&metadata.task_id);
2060            return;
2061        }
2062
2063        // Push-frame queue is gated on `completion_delivered` so foreground
2064        // bash with `notify_on_completion=false` does not leak a user-visible
2065        // completion notification. `mark_terminal` pre-sets
2066        // `completion_delivered=true` for those tasks; honoring it here keeps
2067        // the suppression invariant the test
2068        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2069        // asserts. The compression-event recording above intentionally runs
2070        // before this gate so foreground bash still contributes to the
2071        // session/project aggregates.
2072        if metadata.completion_delivered {
2073            return;
2074        }
2075
2076        // Push-frame queue dedupe stays per-task to prevent duplicate
2077        // user-visible completion notifications.
2078        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2079            if completions
2080                .iter()
2081                .any(|existing| existing.task_id == metadata.task_id)
2082            {
2083                false
2084            } else {
2085                completions.push_back(completion.clone());
2086                true
2087            }
2088        } else {
2089            false
2090        };
2091
2092        if pushed && emit_frame {
2093            self.emit_bash_completed(completion);
2094        }
2095    }
2096
2097    fn record_compression_event_if_applicable(
2098        &self,
2099        metadata: &PersistedTask,
2100        token_counts: &CompletionTokenCounts,
2101    ) {
2102        if metadata.mode == BgMode::Pty {
2103            return;
2104        }
2105
2106        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2107            token_counts.original_tokens,
2108            token_counts.compressed_tokens,
2109            token_counts.original_bytes,
2110            token_counts.compressed_bytes,
2111        ) {
2112            (
2113                Some(original_tokens),
2114                Some(compressed_tokens),
2115                Some(original_bytes),
2116                Some(compressed_bytes),
2117            ) => (
2118                original_tokens,
2119                compressed_tokens,
2120                original_bytes,
2121                compressed_bytes,
2122            ),
2123            _ => {
2124                crate::slog_warn!(
2125                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2126                    metadata.task_id
2127                );
2128                return;
2129            }
2130        };
2131
2132        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2133        let Some(pool) = pool else {
2134            crate::slog_warn!(
2135                "compression event skipped for {}: db_pool not initialized — was configure run?",
2136                metadata.task_id
2137            );
2138            return;
2139        };
2140        let harness = self
2141            .inner
2142            .db_harness
2143            .read()
2144            .ok()
2145            .and_then(|slot| slot.clone());
2146        let Some(harness) = harness else {
2147            crate::slog_warn!(
2148                "compression event insert skipped for {}: harness not configured",
2149                metadata.task_id
2150            );
2151            return;
2152        };
2153
2154        let project_root = metadata
2155            .project_root
2156            .as_deref()
2157            .unwrap_or(&metadata.workdir);
2158        let project_key = crate::search_index::project_cache_key(project_root);
2159        let row = crate::db::compression_events::CompressionEventRow {
2160            harness: &harness,
2161            session_id: Some(&metadata.session_id),
2162            project_key: &project_key,
2163            tool: "bash",
2164            task_id: Some(&metadata.task_id),
2165            command: Some(&metadata.command),
2166            compressor: if metadata.compressed {
2167                "registry"
2168            } else {
2169                "none"
2170            },
2171            original_bytes,
2172            compressed_bytes,
2173            original_tokens,
2174            compressed_tokens,
2175            created_at: unix_millis() as i64,
2176        };
2177
2178        let conn = match pool.lock() {
2179            Ok(conn) => conn,
2180            Err(_) => {
2181                crate::slog_warn!(
2182                    "compression event insert failed for {}: db mutex poisoned",
2183                    metadata.task_id
2184                );
2185                return;
2186            }
2187        };
2188        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2189            Ok(_) => {
2190                // DEBUG-level: each foreground bash call records one of these,
2191                // which clutters info-level logs without adding diagnostic value.
2192                // Aggregate totals are visible via the status RPC / TUI sidebar.
2193                crate::slog_debug!(
2194                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2195                    metadata.task_id,
2196                    project_key,
2197                    metadata.session_id,
2198                    original_tokens,
2199                    compressed_tokens
2200                );
2201            }
2202            Err(error) => {
2203                crate::slog_warn!(
2204                    "compression event insert failed for {}: {}",
2205                    metadata.task_id,
2206                    error
2207                );
2208            }
2209        }
2210    }
2211
2212    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2213        let Ok(progress_sender) = self
2214            .inner
2215            .progress_sender
2216            .lock()
2217            .map(|sender| sender.clone())
2218        else {
2219            return;
2220        };
2221        if let Some(sender) = progress_sender.as_ref() {
2222            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2223                pattern_match.task_id,
2224                session_id.to_string(),
2225                pattern_match.watch_id,
2226                pattern_match.match_text,
2227                pattern_match.match_offset,
2228                pattern_match.context,
2229                pattern_match.once,
2230            )));
2231        }
2232    }
2233
2234    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2235        let Ok(progress_sender) = self
2236            .inner
2237            .progress_sender
2238            .lock()
2239            .map(|sender| sender.clone())
2240        else {
2241            return;
2242        };
2243        let Some(sender) = progress_sender.as_ref() else {
2244            return;
2245        };
2246        let status = completion_status_text(&completion.status, completion.exit_code);
2247        let preview = completion.output_preview.trim_end();
2248        let context = if preview.is_empty() {
2249            format!("task {} exited ({status})", completion.task_id)
2250        } else {
2251            format!(
2252                "task {} exited ({status})
2253{preview}",
2254                completion.task_id
2255            )
2256        };
2257        sender(PushFrame::BashPatternMatch(
2258            BashPatternMatchFrame::task_exit(
2259                completion.task_id.clone(),
2260                completion.session_id.clone(),
2261                format!("exited ({status})"),
2262                context,
2263            ),
2264        ));
2265    }
2266
2267    fn emit_bash_completed(&self, completion: BgCompletion) {
2268        let Ok(progress_sender) = self
2269            .inner
2270            .progress_sender
2271            .lock()
2272            .map(|sender| sender.clone())
2273        else {
2274            return;
2275        };
2276        let Some(sender) = progress_sender.as_ref() else {
2277            return;
2278        };
2279        // Clone the callback out of the registry mutex before writing to stdout;
2280        // otherwise a blocked push-frame write could pin the mutex and starve
2281        // unrelated progress-sender updates.
2282        // Bg task transitions are discovered by the watchdog thread, so the
2283        // sender is shared behind a Mutex. It still uses the same stdout writer
2284        // closure as foreground progress frames, preserving the existing lock/
2285        // flush behavior in main.rs.
2286        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2287            completion.task_id,
2288            completion.session_id,
2289            completion.status,
2290            completion.exit_code,
2291            completion.command,
2292            completion.output_preview,
2293            completion.output_truncated,
2294            completion.original_tokens,
2295            completion.compressed_tokens,
2296            completion.tokens_skipped,
2297        )));
2298    }
2299
2300    fn completion_token_counts(
2301        &self,
2302        metadata: &PersistedTask,
2303        buffer: Option<&BgBuffer>,
2304        paths: Option<&TaskPaths>,
2305    ) -> CompletionTokenCounts {
2306        if metadata.mode == BgMode::Pty {
2307            return CompletionTokenCounts::skipped();
2308        }
2309
2310        let raw = match buffer {
2311            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2312            None => paths
2313                .map(|paths| {
2314                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2315                })
2316                .unwrap_or(TokenCountInput::Skipped),
2317        };
2318
2319        let TokenCountInput::Text(raw_output) = raw else {
2320            return CompletionTokenCounts::skipped();
2321        };
2322
2323        let original_tokens = token_count_u32(&raw_output);
2324        let original_bytes = raw_output.len() as i64;
2325        let compressed_output = if metadata.compressed {
2326            self.compress_output(&metadata.command, raw_output)
2327        } else {
2328            raw_output
2329        };
2330        let compressed_tokens = token_count_u32(&compressed_output);
2331        let compressed_bytes = compressed_output.len() as i64;
2332        CompletionTokenCounts {
2333            original_tokens: Some(original_tokens),
2334            compressed_tokens: Some(compressed_tokens),
2335            original_bytes: Some(original_bytes),
2336            compressed_bytes: Some(compressed_bytes),
2337            tokens_skipped: false,
2338        }
2339    }
2340
2341    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2342        if !self
2343            .inner
2344            .long_running_reminder_enabled
2345            .load(Ordering::SeqCst)
2346        {
2347            return;
2348        }
2349        let interval_ms = self
2350            .inner
2351            .long_running_reminder_interval_ms
2352            .load(Ordering::SeqCst);
2353        if interval_ms == 0 {
2354            return;
2355        }
2356        let interval = Duration::from_millis(interval_ms);
2357        let now = Instant::now();
2358        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2359            return;
2360        };
2361        let since = last_reminder_at.unwrap_or(task.started);
2362        if now.duration_since(since) < interval {
2363            return;
2364        }
2365        let command = task
2366            .state
2367            .lock()
2368            .map(|state| state.metadata.command.clone())
2369            .unwrap_or_default();
2370        *last_reminder_at = Some(now);
2371        self.emit_bash_long_running(BashLongRunningFrame::new(
2372            task.task_id.clone(),
2373            task.session_id.clone(),
2374            command,
2375            task.started.elapsed().as_millis() as u64,
2376        ));
2377    }
2378
2379    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2380        let Ok(progress_sender) = self
2381            .inner
2382            .progress_sender
2383            .lock()
2384            .map(|sender| sender.clone())
2385        else {
2386            return;
2387        };
2388        if let Some(sender) = progress_sender.as_ref() {
2389            sender(PushFrame::BashLongRunning(frame));
2390        }
2391    }
2392
2393    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2394        self.inner
2395            .tasks
2396            .lock()
2397            .ok()
2398            .and_then(|tasks| tasks.get(task_id).cloned())
2399    }
2400
2401    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2402        self.task(task_id)
2403            .filter(|task| task.session_id == session_id)
2404    }
2405
2406    fn running_count(&self) -> usize {
2407        self.inner
2408            .tasks
2409            .lock()
2410            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2411            .unwrap_or(0)
2412    }
2413
2414    fn start_watchdog(&self) {
2415        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2416            super::watchdog::start(self.clone());
2417        }
2418    }
2419
2420    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2421        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2422    }
2423
2424    #[cfg(test)]
2425    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2426        self.task_for_session(task_id, session_id)
2427            .map(|task| task.paths.json.clone())
2428    }
2429
2430    #[cfg(test)]
2431    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2432        self.task_for_session(task_id, session_id)
2433            .map(|task| task.paths.exit.clone())
2434    }
2435
2436    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2437    fn generate_unique_task_id(&self) -> Result<String, String> {
2438        for _ in 0..32 {
2439            let candidate = random_slug();
2440            let tasks = self
2441                .inner
2442                .tasks
2443                .lock()
2444                .map_err(|_| "background task registry lock poisoned".to_string())?;
2445            if tasks.contains_key(&candidate) {
2446                continue;
2447            }
2448            let completions = self
2449                .inner
2450                .completions
2451                .lock()
2452                .map_err(|_| "background completions lock poisoned".to_string())?;
2453            if completions
2454                .iter()
2455                .any(|completion| completion.task_id == candidate)
2456            {
2457                continue;
2458            }
2459            return Ok(candidate);
2460        }
2461        Err("failed to allocate unique background task id after 32 attempts".to_string())
2462    }
2463}
2464
2465struct CompletionTokenCounts {
2466    original_tokens: Option<u32>,
2467    compressed_tokens: Option<u32>,
2468    original_bytes: Option<i64>,
2469    compressed_bytes: Option<i64>,
2470    tokens_skipped: bool,
2471}
2472
2473impl CompletionTokenCounts {
2474    fn skipped() -> Self {
2475        Self {
2476            original_tokens: None,
2477            compressed_tokens: None,
2478            original_bytes: None,
2479            compressed_bytes: None,
2480            tokens_skipped: true,
2481        }
2482    }
2483}
2484
2485fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2486    match status {
2487        BgTaskStatus::TimedOut => "timed out".to_string(),
2488        BgTaskStatus::Killed => "killed".to_string(),
2489        _ => exit_code
2490            .map(|code| format!("exit {code}"))
2491            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2492    }
2493}
2494
2495fn token_count_u32(text: &str) -> u32 {
2496    aft_tokenizer::count_tokens(text)
2497        .try_into()
2498        .unwrap_or(u32::MAX)
2499}
2500
2501impl Default for BgTaskRegistry {
2502    fn default() -> Self {
2503        Self::new(Arc::new(Mutex::new(None)))
2504    }
2505}
2506
2507fn modified_within(path: &Path, grace: Duration) -> bool {
2508    fs::metadata(path)
2509        .and_then(|metadata| metadata.modified())
2510        .ok()
2511        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2512        .map(|age| age < grace)
2513        .unwrap_or(false)
2514}
2515
2516fn canonicalized_path(path: &Path) -> PathBuf {
2517    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2518}
2519
2520fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2521    let now_ms = SystemTime::now()
2522        .duration_since(UNIX_EPOCH)
2523        .ok()
2524        .map(|duration| duration.as_millis() as u64)
2525        .unwrap_or(started_at);
2526    let elapsed_ms = now_ms.saturating_sub(started_at);
2527    Instant::now()
2528        .checked_sub(Duration::from_millis(elapsed_ms))
2529        .unwrap_or_else(Instant::now)
2530}
2531
2532fn gc_quarantine(storage_dir: &Path) {
2533    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2534    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2535        return;
2536    };
2537    for session_entry in session_dirs.flatten() {
2538        let session_quarantine_dir = session_entry.path();
2539        if !session_quarantine_dir.is_dir() {
2540            continue;
2541        }
2542        let entries = match fs::read_dir(&session_quarantine_dir) {
2543            Ok(entries) => entries,
2544            Err(error) => {
2545                crate::slog_warn!(
2546                    "failed to read background task quarantine dir {}: {error}",
2547                    session_quarantine_dir.display()
2548                );
2549                continue;
2550            }
2551        };
2552        for entry in entries.flatten() {
2553            let path = entry.path();
2554            if modified_within(&path, QUARANTINE_GC_GRACE) {
2555                continue;
2556            }
2557            let result = if path.is_dir() {
2558                fs::remove_dir_all(&path)
2559            } else {
2560                fs::remove_file(&path)
2561            };
2562            match result {
2563                Ok(()) => log::debug!(
2564                    "deleted old background task quarantine entry {}",
2565                    path.display()
2566                ),
2567                Err(error) => crate::slog_warn!(
2568                    "failed to delete old background task quarantine entry {}: {error}",
2569                    path.display()
2570                ),
2571            }
2572        }
2573        let _ = fs::remove_dir(&session_quarantine_dir);
2574    }
2575    let _ = fs::remove_dir(&quarantine_root);
2576}
2577
2578enum QuarantineKind {
2579    Corrupt,
2580    Invalid,
2581}
2582
2583fn quarantine_task_json(
2584    storage_dir: &Path,
2585    session_dir: &Path,
2586    json_path: &Path,
2587    kind: QuarantineKind,
2588) -> Result<(), String> {
2589    let session_hash = session_dir
2590        .file_name()
2591        .and_then(|name| name.to_str())
2592        .ok_or_else(|| {
2593            format!(
2594                "invalid background task session dir: {}",
2595                session_dir.display()
2596            )
2597        })?;
2598    let task_name = json_path
2599        .file_name()
2600        .and_then(|name| name.to_str())
2601        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2602    let unix_ts = SystemTime::now()
2603        .duration_since(UNIX_EPOCH)
2604        .map(|duration| duration.as_secs())
2605        .unwrap_or(0);
2606    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2607    fs::create_dir_all(&quarantine_dir).map_err(|e| {
2608        format!(
2609            "failed to create background task quarantine dir {}: {e}",
2610            quarantine_dir.display()
2611        )
2612    })?;
2613    let target_name = quarantine_name(task_name, unix_ts, &kind);
2614    let target = quarantine_dir.join(target_name);
2615    fs::rename(json_path, &target).map_err(|e| {
2616        format!(
2617            "failed to quarantine background task metadata {} to {}: {e}",
2618            json_path.display(),
2619            target.display()
2620        )
2621    })?;
2622
2623    for sibling in task_sibling_paths(json_path) {
2624        if !sibling.exists() {
2625            continue;
2626        }
2627        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2628            crate::slog_warn!(
2629                "skipping background task sibling with invalid name during quarantine: {}",
2630                sibling.display()
2631            );
2632            continue;
2633        };
2634        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2635        if let Err(error) = fs::rename(&sibling, &sibling_target) {
2636            crate::slog_warn!(
2637                "failed to quarantine background task sibling {} to {}: {error}",
2638                sibling.display(),
2639                sibling_target.display()
2640            );
2641        }
2642    }
2643
2644    let _ = fs::remove_dir(session_dir);
2645    Ok(())
2646}
2647
2648fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2649    match kind {
2650        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2651        QuarantineKind::Invalid => {
2652            let path = Path::new(file_name);
2653            let stem = path.file_stem().and_then(|stem| stem.to_str());
2654            let extension = path.extension().and_then(|extension| extension.to_str());
2655            match (stem, extension) {
2656                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2657                _ => format!("{file_name}.invalid.{unix_ts}"),
2658            }
2659        }
2660    }
2661}
2662
2663fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2664    let Some(parent) = json_path.parent() else {
2665        return Vec::new();
2666    };
2667    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2668        return Vec::new();
2669    };
2670    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2671        .into_iter()
2672        .map(|extension| parent.join(format!("{stem}.{extension}")))
2673        .collect()
2674}
2675
2676fn read_tail_from_disk(
2677    metadata: &PersistedTask,
2678    paths: &TaskPaths,
2679    max_bytes: usize,
2680) -> (String, bool) {
2681    if metadata.mode == BgMode::Pty {
2682        return read_file_tail_capped(&paths.pty, max_bytes)
2683            .map(|bytes| {
2684                let truncated = fs::metadata(&paths.pty)
2685                    .map(|metadata| metadata.len() > max_bytes as u64)
2686                    .unwrap_or(false);
2687                (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2688            })
2689            .unwrap_or_else(|_| (String::new(), false));
2690    }
2691    let stdout = fs::read(&paths.stdout).unwrap_or_default();
2692    let stderr = fs::read(&paths.stderr).unwrap_or_default();
2693    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2694    bytes.extend_from_slice(&stdout);
2695    bytes.extend_from_slice(&stderr);
2696    if bytes.len() <= max_bytes {
2697        return (String::from_utf8_lossy(&bytes).into_owned(), false);
2698    }
2699    let start = bytes.len().saturating_sub(max_bytes);
2700    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2701}
2702
2703fn read_for_token_count_from_disk(
2704    metadata: &PersistedTask,
2705    paths: &TaskPaths,
2706    max_bytes_per_stream: usize,
2707) -> TokenCountInput {
2708    if metadata.mode == BgMode::Pty {
2709        return TokenCountInput::Skipped;
2710    }
2711    // Read up to `max_bytes_per_stream` bytes per stream rather than
2712    // refusing to tokenize anything when the file exceeds the cap.
2713    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
2714    // (see comment there) — large outputs are exactly the tasks that
2715    // benefit most from compression accounting, so silent-skipping
2716    // them defeats the purpose of token tracking.
2717    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2718    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2719    match (stdout, stderr) {
2720        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2721            String::from_utf8_lossy(&stdout).as_ref(),
2722            String::from_utf8_lossy(&stderr).as_ref(),
2723        )),
2724        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2725            String::from_utf8_lossy(&stdout).as_ref(),
2726            "",
2727        )),
2728        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2729            "",
2730            String::from_utf8_lossy(&stderr).as_ref(),
2731        )),
2732        (Err(_), Err(_)) => TokenCountInput::Skipped,
2733    }
2734}
2735
2736/// Read at most `max_bytes` bytes from the END of `path`. Used for
2737/// tokenization where the most recent output is more representative than
2738/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
2739/// opened (genuinely missing or permissions error).
2740fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2741    use std::io::{Read, Seek, SeekFrom};
2742    let mut file = std::fs::File::open(path)?;
2743    let len = file.metadata()?.len();
2744    let read_len = len.min(max_bytes as u64);
2745    if read_len > 0 && len > max_bytes as u64 {
2746        file.seek(SeekFrom::End(-(read_len as i64)))?;
2747    }
2748    let mut bytes = Vec::with_capacity(read_len as usize);
2749    file.read_to_end(&mut bytes)?;
2750    Ok(bytes)
2751}
2752
2753impl BgTask {
2754    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2755        let state = self
2756            .state
2757            .lock()
2758            .unwrap_or_else(|poison| poison.into_inner());
2759        self.snapshot_locked(&state, preview_bytes)
2760    }
2761
2762    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2763        let metadata = &state.metadata;
2764        let duration_ms = metadata.duration_ms.or_else(|| {
2765            metadata
2766                .status
2767                .is_terminal()
2768                .then(|| self.started.elapsed().as_millis() as u64)
2769        });
2770        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2771            (String::new(), false)
2772        } else {
2773            state.buffer.read_tail(preview_bytes)
2774        };
2775        BgTaskSnapshot {
2776            info: BgTaskInfo {
2777                task_id: self.task_id.clone(),
2778                status: metadata.status.clone(),
2779                command: metadata.command.clone(),
2780                mode: metadata.mode.clone(),
2781                started_at: metadata.started_at,
2782                duration_ms,
2783            },
2784            exit_code: metadata.exit_code,
2785            child_pid: metadata.child_pid,
2786            workdir: metadata.workdir.display().to_string(),
2787            output_preview,
2788            output_truncated,
2789            output_path: state
2790                .buffer
2791                .output_path()
2792                .map(|path| path.display().to_string()),
2793            stderr_path: state
2794                .buffer
2795                .stderr_path()
2796                .map(|path| path.display().to_string()),
2797            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2798            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2799        }
2800    }
2801
2802    pub(crate) fn is_running(&self) -> bool {
2803        self.state
2804            .lock()
2805            .map(|state| {
2806                state.metadata.status == BgTaskStatus::Running
2807                    || (state.metadata.mode == BgMode::Pty
2808                        && state.metadata.status == BgTaskStatus::Killing)
2809            })
2810            .unwrap_or(false)
2811    }
2812
2813    fn is_terminal(&self) -> bool {
2814        self.state
2815            .lock()
2816            .map(|state| state.metadata.status.is_terminal())
2817            .unwrap_or(false)
2818    }
2819
2820    fn mark_terminal_now(&self) {
2821        if let Ok(mut terminal_at) = self.terminal_at.lock() {
2822            if terminal_at.is_none() {
2823                *terminal_at = Some(Instant::now());
2824            }
2825        }
2826    }
2827
2828    fn set_completion_delivered(
2829        &self,
2830        delivered: bool,
2831        registry: &BgTaskRegistry,
2832    ) -> Result<(), String> {
2833        let mut state = self
2834            .state
2835            .lock()
2836            .map_err(|_| "background task lock poisoned".to_string())?;
2837        let updated = registry
2838            .update_task_metadata(&self.paths, |metadata| {
2839                metadata.completion_delivered = delivered;
2840            })
2841            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2842        state.metadata = updated;
2843        Ok(())
2844    }
2845}
2846
2847fn terminal_metadata_from_marker(
2848    mut metadata: PersistedTask,
2849    marker: ExitMarker,
2850    reason: Option<String>,
2851) -> PersistedTask {
2852    match marker {
2853        ExitMarker::Code(code) => {
2854            let status = if code == 0 {
2855                BgTaskStatus::Completed
2856            } else {
2857                BgTaskStatus::Failed
2858            };
2859            metadata.mark_terminal(status, Some(code), reason);
2860        }
2861        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2862    }
2863    metadata
2864}
2865
2866#[cfg(unix)]
2867fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2868    let shell = resolve_posix_shell();
2869    let mut cmd = Command::new(&shell);
2870    cmd.arg("-c")
2871        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2872        .arg(&shell)
2873        .arg(command)
2874        .arg(exit_path);
2875    unsafe {
2876        cmd.pre_exec(|| {
2877            if libc::setsid() == -1 {
2878                return Err(std::io::Error::last_os_error());
2879            }
2880            Ok(())
2881        });
2882    }
2883    cmd
2884}
2885
2886#[cfg(unix)]
2887fn resolve_posix_shell() -> PathBuf {
2888    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2889    POSIX_SHELL
2890        .get_or_init(|| {
2891            std::env::var_os("BASH")
2892                .filter(|value| !value.is_empty())
2893                .map(PathBuf::from)
2894                .filter(|path| path.exists())
2895                .or_else(|| which::which("bash").ok())
2896                .or_else(|| which::which("zsh").ok())
2897                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2898        })
2899        .clone()
2900}
2901
2902#[cfg(windows)]
2903fn detached_shell_command_for(
2904    shell: crate::windows_shell::WindowsShell,
2905    command: &str,
2906    exit_path: &Path,
2907    paths: &TaskPaths,
2908    creation_flags: u32,
2909) -> Result<Command, String> {
2910    use crate::windows_shell::WindowsShell;
2911    // Write the wrapper to a temp file alongside the other task files,
2912    // then invoke the shell with the file path as a single clean
2913    // argument. This sidesteps the entire Windows command-line quoting
2914    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
2915    // parser all interacting with embedded quotes in the wrapper).
2916    //
2917    // Path arguments don't need quoting in the same problematic way
2918    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
2919    // contains no characters that need shell escaping; (2) the wrapper
2920    // body's internal quotes never reach the shell command line — the
2921    // shell reads them from disk by file syntax rules, not command-line
2922    // parser rules.
2923    let wrapper_body = shell.wrapper_script(command, exit_path);
2924    let wrapper_ext = match shell {
2925        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2926        WindowsShell::Cmd => "bat",
2927        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
2928        // so the file extension is purely cosmetic; `.sh` matches what an
2929        // operator would expect when grepping the spill directory.
2930        WindowsShell::Posix(_) => "sh",
2931    };
2932    let wrapper_path = paths.dir.join(format!(
2933        "{}.{}",
2934        paths
2935            .json
2936            .file_stem()
2937            .and_then(|s| s.to_str())
2938            .unwrap_or("wrapper"),
2939        wrapper_ext
2940    ));
2941    fs::write(&wrapper_path, wrapper_body)
2942        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2943
2944    let mut cmd = Command::new(shell.binary().as_ref());
2945    match shell {
2946        WindowsShell::Pwsh | WindowsShell::Powershell => {
2947            // -File runs the script with no quoting issues. `-NoLogo`,
2948            // `-NoProfile`, etc. apply to the host before the file runs.
2949            cmd.args([
2950                "-NoLogo",
2951                "-NoProfile",
2952                "-NonInteractive",
2953                "-ExecutionPolicy",
2954                "Bypass",
2955                "-File",
2956            ]);
2957            cmd.arg(&wrapper_path);
2958        }
2959        WindowsShell::Cmd => {
2960            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
2961            // file via /C is well-defined; the file's contents are
2962            // read line-by-line by cmd's batch processor, NOT
2963            // re-interpreted by the /C parser. This avoids the
2964            // "filename syntax incorrect" errors that came from
2965            // having complex compound commands on the cmd line.
2966            cmd.args(["/D", "/C"]);
2967            cmd.arg(&wrapper_path);
2968        }
2969        WindowsShell::Posix(_) => {
2970            // git-bash and other POSIX shells run the wrapper script with
2971            // `<binary> <wrapper-path>` (the wrapper is just a shell
2972            // script). No special flags needed — the `trap` and atomic
2973            // exit-marker rename in `wrapper_script` are POSIX-standard.
2974            cmd.arg(&wrapper_path);
2975        }
2976    }
2977
2978    // Win32 process creation flags. Caller selects whether to include
2979    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
2980    // for the breakaway-fallback strategy.
2981    cmd.creation_flags(creation_flags);
2982    Ok(cmd)
2983}
2984
2985/// Spawn a detached background bash child process.
2986///
2987/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
2988/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
2989/// cmd.exe) and retries with the next candidate when the previous one
2990/// fails to spawn with `NotFound` — the same runtime safety net the
2991/// foreground bash path has, so issue #27 callers landing on cmd.exe
2992/// fallback can also use background bash. The wrapper script is
2993/// regenerated per attempt because PowerShell wrappers embed the shell
2994/// binary by name; the stdout/stderr capture handles are also reopened
2995/// per attempt because `Command::spawn()` consumes them.
2996///
2997/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
2998/// return immediately without retry — they indicate a problem with the
2999/// resolved shell that retrying with a different shell won't fix.
3000fn spawn_detached_child(
3001    command: &str,
3002    paths: &TaskPaths,
3003    workdir: &Path,
3004    env: &HashMap<String, String>,
3005) -> Result<std::process::Child, String> {
3006    #[cfg(not(windows))]
3007    {
3008        let stdout = create_capture_file(&paths.stdout)
3009            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3010        let stderr = create_capture_file(&paths.stderr)
3011            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3012        detached_shell_command(command, &paths.exit)
3013            .current_dir(workdir)
3014            .envs(env)
3015            .stdin(Stdio::null())
3016            .stdout(Stdio::from(stdout))
3017            .stderr(Stdio::from(stderr))
3018            .spawn()
3019            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3020    }
3021    #[cfg(windows)]
3022    {
3023        use crate::windows_shell::shell_candidates;
3024        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3025        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3026        // this background spawn helper, including foreground tool calls
3027        // where the model writes PowerShell-syntax (`$var = ...`,
3028        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3029        // The earlier v0.18-era cmd-first override worked around a
3030        // PowerShell detached-output bug; that bug is fixed at the
3031        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3032        // see flag block below), so we no longer need to misroute PS
3033        // commands through cmd.
3034        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3035        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3036        // first (so the bg child outlives the AFT process when AFT is killed),
3037        // then fall back without it for environments where the parent is in a
3038        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3039        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3040        // environments hit this — `CreateProcess` returns Access Denied (5).
3041        // Without breakaway, the child still runs detached but will be torn
3042        // down with the parent if the parent process group is signaled.
3043        //
3044        // We use CREATE_NO_WINDOW (no visible console window, but the
3045        // child still has a hidden console) rather than DETACHED_PROCESS
3046        // (no console at all). PowerShell-based wrappers that perform
3047        // file I/O via [System.IO.File] need a console handle to flush
3048        // stdout/stderr correctly even when redirected — under
3049        // DETACHED_PROCESS, pwsh sometimes silently exits before
3050        // executing later script statements (the Move-Item that writes
3051        // the exit marker never runs), leaving the bg task forever
3052        // marked Failed: process exited without exit marker. cmd.exe
3053        // wrappers tolerate DETACHED_PROCESS, but switching to
3054        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3055        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3056        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3057        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3058        let with_breakaway =
3059            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3060        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3061        let mut last_error: Option<String> = None;
3062        for (idx, shell) in candidates.iter().enumerate() {
3063            // Per-shell, try with breakaway first. If the process is in a
3064            // restrictive job, the breakaway flag triggers Access Denied
3065            // (os error 5). Retry once without breakaway.
3066            for &flags in &[with_breakaway, without_breakaway] {
3067                // Re-open capture handles per attempt; spawn() consumes them.
3068                let stdout = create_capture_file(&paths.stdout)
3069                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3070                let stderr = create_capture_file(&paths.stderr)
3071                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3072                let mut cmd =
3073                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3074                cmd.current_dir(workdir)
3075                    .envs(env)
3076                    .stdin(Stdio::null())
3077                    .stdout(Stdio::from(stdout))
3078                    .stderr(Stdio::from(stderr));
3079                match cmd.spawn() {
3080                    Ok(child) => {
3081                        if idx > 0 {
3082                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3083                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3084                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3085                            shell.binary(),
3086                            idx);
3087                        }
3088                        if flags == without_breakaway {
3089                            crate::slog_warn!(
3090                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3091                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3092                             Spawned without breakaway; the bg task will be torn down if the \
3093                             AFT process group is killed."
3094                            );
3095                        }
3096                        return Ok(child);
3097                    }
3098                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3099                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3100                        shell.binary());
3101                        last_error = Some(format!("{}: {e}", shell.binary()));
3102                        // Skip the without-breakaway retry for NotFound — the
3103                        // binary itself is missing, breakaway flag is irrelevant.
3104                        break;
3105                    }
3106                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3107                        // Access Denied during breakaway — retry without it.
3108                        crate::slog_warn!(
3109                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3110                         Access Denied — retrying {} without breakaway",
3111                            shell.binary()
3112                        );
3113                        last_error = Some(format!("{}: {e}", shell.binary()));
3114                        continue;
3115                    }
3116                    Err(e) => {
3117                        return Err(format!(
3118                            "failed to spawn background bash command via {}: {e}",
3119                            shell.binary()
3120                        ));
3121                    }
3122                }
3123            }
3124        }
3125        Err(format!(
3126            "failed to spawn background bash command: no Windows shell could be spawned. \
3127             Last error: {}. PATH-probed candidates: {:?}",
3128            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3129            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3130        ))
3131    }
3132}
3133
3134fn random_slug() -> String {
3135    let mut bytes = [0u8; 4];
3136    // getrandom is a transitive dependency; use it directly for OS entropy.
3137    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3138        // Extremely unlikely fallback: time + pid mix.
3139        let t = SystemTime::now()
3140            .duration_since(UNIX_EPOCH)
3141            .map(|d| d.subsec_nanos())
3142            .unwrap_or(0);
3143        let p = std::process::id();
3144        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3145    });
3146    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
3147    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3148    format!("bash-{hex}")
3149}
3150
3151#[cfg(test)]
3152mod tests {
3153    use std::collections::HashMap;
3154    #[cfg(windows)]
3155    use std::fs;
3156    use std::sync::{Arc, Mutex};
3157    use std::time::Duration;
3158    #[cfg(windows)]
3159    use std::time::Instant;
3160
3161    use super::*;
3162
3163    #[cfg(unix)]
3164    const QUICK_SUCCESS_COMMAND: &str = "true";
3165    #[cfg(windows)]
3166    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3167
3168    #[cfg(unix)]
3169    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3170    #[cfg(windows)]
3171    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3172
3173    #[test]
3174    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3175        let registry = BgTaskRegistry::default();
3176        let dir = tempfile::tempdir().unwrap();
3177        let task_id = registry
3178            .spawn_pty(
3179                QUICK_SUCCESS_COMMAND,
3180                "session".to_string(),
3181                dir.path().to_path_buf(),
3182                HashMap::new(),
3183                Some(Duration::from_secs(30)),
3184                dir.path().to_path_buf(),
3185                10,
3186                true,
3187                false,
3188                Some(dir.path().to_path_buf()),
3189                50,
3190                120,
3191            )
3192            .unwrap();
3193
3194        let paths = task_paths(dir.path(), "session", &task_id);
3195        let metadata = read_task(&paths.json).unwrap();
3196        assert_eq!(
3197            metadata.schema_version,
3198            crate::bash_background::persistence::SCHEMA_VERSION
3199        );
3200        assert_eq!(metadata.mode, BgMode::Pty);
3201        assert_eq!(metadata.pty_rows, Some(50));
3202        assert_eq!(metadata.pty_cols, Some(120));
3203
3204        let snapshot = registry
3205            .status(&task_id, "session", None, Some(dir.path()), 1024)
3206            .unwrap();
3207        assert_eq!(snapshot.pty_rows, Some(50));
3208        assert_eq!(snapshot.pty_cols, Some(120));
3209    }
3210
3211    /// Spawn a child process that exits immediately and return it after
3212    /// it has terminated. Used by reap_child tests to simulate the
3213    /// "child exists and is dead" state when the watchdog has already
3214    /// nulled out the original child handle.
3215    fn spawn_dead_child() -> std::process::Child {
3216        #[cfg(unix)]
3217        let mut cmd = std::process::Command::new("true");
3218        #[cfg(windows)]
3219        let mut cmd = {
3220            let mut c = std::process::Command::new("cmd");
3221            c.args(["/c", "exit", "0"]);
3222            c
3223        };
3224        cmd.stdin(std::process::Stdio::null());
3225        cmd.stdout(std::process::Stdio::null());
3226        cmd.stderr(std::process::Stdio::null());
3227        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3228        // Poll try_wait() until the child actually exits, instead of calling
3229        // wait() which closes the OS handle. On Windows, after wait()
3230        // closes the handle, subsequent try_wait() calls (which reap_child
3231        // depends on) return Err — the test was inadvertently giving
3232        // reap_child an unusable child handle. Polling try_wait() keeps the
3233        // handle open and observes natural exit, matching the production
3234        // shape where the watchdog discovers an exited child for the first
3235        // time.
3236        let started = Instant::now();
3237        loop {
3238            match child.try_wait() {
3239                Ok(Some(_)) => break,
3240                Ok(None) => {
3241                    if started.elapsed() > Duration::from_secs(5) {
3242                        panic!("dead-child stand-in did not exit within 5s");
3243                    }
3244                    std::thread::sleep(Duration::from_millis(10));
3245                }
3246                Err(error) => panic!("dead-child try_wait failed: {error}"),
3247            }
3248        }
3249        child
3250    }
3251
3252    #[test]
3253    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3254        let registry = BgTaskRegistry::default();
3255        let dir = tempfile::tempdir().unwrap();
3256        let task_id = registry
3257            .spawn(
3258                LONG_RUNNING_COMMAND,
3259                "session".to_string(),
3260                dir.path().to_path_buf(),
3261                HashMap::new(),
3262                Some(Duration::from_secs(30)),
3263                dir.path().to_path_buf(),
3264                10,
3265                true,
3266                false,
3267                Some(dir.path().to_path_buf()),
3268            )
3269            .unwrap();
3270        registry
3271            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3272            .unwrap();
3273        assert_eq!(
3274            registry
3275                .drain_completions_for_session(Some("session"))
3276                .len(),
3277            1
3278        );
3279
3280        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
3281        // locally before the Rust completion queue is drained/acked.
3282        registry.inner.completions.lock().unwrap().clear();
3283
3284        assert_eq!(
3285            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3286            vec![task_id.clone()]
3287        );
3288        assert!(registry
3289            .drain_completions_for_session(Some("session"))
3290            .is_empty());
3291
3292        let paths = task_paths(dir.path(), "session", &task_id);
3293        let metadata = read_task(&paths.json).unwrap();
3294        assert!(metadata.completion_delivered);
3295
3296        let replayed = BgTaskRegistry::default();
3297        replayed
3298            .replay_session_inner(dir.path(), "session", None)
3299            .unwrap();
3300        assert!(replayed
3301            .drain_completions_for_session(Some("session"))
3302            .is_empty());
3303    }
3304
3305    #[test]
3306    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3307        let registry = BgTaskRegistry::default();
3308        let dir = tempfile::tempdir().unwrap();
3309        let task_id = registry
3310            .spawn(
3311                QUICK_SUCCESS_COMMAND,
3312                "session".to_string(),
3313                dir.path().to_path_buf(),
3314                HashMap::new(),
3315                Some(Duration::from_secs(30)),
3316                dir.path().to_path_buf(),
3317                10,
3318                true,
3319                false,
3320                Some(dir.path().to_path_buf()),
3321            )
3322            .unwrap();
3323        registry
3324            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3325            .unwrap();
3326        let completions = registry.drain_completions_for_session(Some("session"));
3327        assert_eq!(completions.len(), 1);
3328        assert_eq!(
3329            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3330            vec![task_id.clone()]
3331        );
3332
3333        registry.cleanup_finished(Duration::ZERO);
3334
3335        assert!(registry.inner.tasks.lock().unwrap().is_empty());
3336    }
3337
3338    #[test]
3339    fn cleanup_finished_retains_undelivered_terminals() {
3340        let registry = BgTaskRegistry::default();
3341        let dir = tempfile::tempdir().unwrap();
3342        let task_id = registry
3343            .spawn(
3344                QUICK_SUCCESS_COMMAND,
3345                "session".to_string(),
3346                dir.path().to_path_buf(),
3347                HashMap::new(),
3348                Some(Duration::from_secs(30)),
3349                dir.path().to_path_buf(),
3350                10,
3351                true,
3352                false,
3353                Some(dir.path().to_path_buf()),
3354            )
3355            .unwrap();
3356        registry
3357            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3358            .unwrap();
3359
3360        registry.cleanup_finished(Duration::ZERO);
3361
3362        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3363    }
3364
3365    /// Verify that the live watchdog path (reap_child) gives an exited
3366    /// child one watchdog pass for its exit marker to land, then marks the
3367    /// task Failed if the next pass still sees no marker.
3368    ///
3369    /// Cross-platform: uses a quick-exiting command that does NOT go
3370    /// through the wrapper script (we manually clear the exit marker
3371    /// after spawn to simulate the wrapper crashing before write).
3372    #[test]
3373    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3374        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3375        let dir = tempfile::tempdir().unwrap();
3376        let task_id = registry
3377            .spawn(
3378                QUICK_SUCCESS_COMMAND,
3379                "session".to_string(),
3380                dir.path().to_path_buf(),
3381                HashMap::new(),
3382                Some(Duration::from_secs(30)),
3383                dir.path().to_path_buf(),
3384                10,
3385                true,
3386                false,
3387                Some(dir.path().to_path_buf()),
3388            )
3389            .unwrap();
3390
3391        let task = registry.task_for_session(&task_id, "session").unwrap();
3392
3393        // Wait for the child to actually exit and the wrapper to either
3394        // write the marker or fail. Then nuke the marker to simulate
3395        // wrapper crash before write. Poll up to 5s; this is plenty for a
3396        // `true`/`cmd /c exit 0` invocation.
3397        let started = Instant::now();
3398        loop {
3399            let exited = {
3400                let mut state = task.state.lock().unwrap();
3401                match &mut state.runtime {
3402                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3403                    _ => true,
3404                }
3405            };
3406            if exited {
3407                break;
3408            }
3409            assert!(
3410                started.elapsed() < Duration::from_secs(5),
3411                "child should exit quickly"
3412            );
3413            std::thread::sleep(Duration::from_millis(20));
3414        }
3415
3416        // Stop the watchdog so it doesn't race with our manual reap_child.
3417        // On fast Windows runners the watchdog ticks (every 500ms) can
3418        // observe the child exit and reap it before this test's assertion
3419        // fires, leaving us with state.child = None and an already-terminal
3420        // status. We specifically want to test reap_child's logic when
3421        // invoked manually on a Running-but-actually-dead task, so we need
3422        // exclusive control over the reap path here.
3423        registry
3424            .inner
3425            .shutdown
3426            .store(true, std::sync::atomic::Ordering::SeqCst);
3427        // Give the watchdog at most one tick (500ms) to notice shutdown
3428        // before we touch task state. Without this, an in-flight watchdog
3429        // iteration could still race with our state setup below.
3430        std::thread::sleep(Duration::from_millis(550));
3431
3432        // Wrapper likely wrote the marker by now; remove it to simulate
3433        // a wrapper crash that exited before persisting the exit code.
3434        let _ = std::fs::remove_file(&task.paths.exit);
3435
3436        // The watchdog may have already reaped the child handle and
3437        // marked the task terminal before we got here. Reset both so
3438        // reap_child has the "Running task whose child just exited"
3439        // shape it's designed to handle. If the original child handle is
3440        // gone, install a quick-exited stand-in so the first reap exercises
3441        // the same try_wait path as production.
3442        //
3443        // CRITICAL on Windows: the watchdog ticks fast enough that the
3444        // JSON on disk may already say `Completed`. `update_task` (called
3445        // by `reap_child`) reads from disk, applies the closure, but
3446        // ROLLS BACK if the original on-disk state was already terminal
3447        // (see persistence.rs::update_task). So we must reset BOTH
3448        // in-memory metadata AND the JSON on disk to a Running state to
3449        // give reap_child the fresh shape it expects to operate on.
3450        {
3451            let mut state = task.state.lock().unwrap();
3452            state.metadata.status = BgTaskStatus::Running;
3453            state.metadata.status_reason = None;
3454            state.metadata.exit_code = None;
3455            state.metadata.finished_at = None;
3456            state.metadata.duration_ms = None;
3457            // Persist the reset state to disk so update_task's terminal
3458            // rollback guard sees a non-terminal starting point.
3459            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3460                .expect("persist reset Running metadata for reap_child test");
3461            // If the watchdog already nulled state.child, we need to
3462            // simulate "child exists and is dead" so reap_child's
3463            // try_wait path runs. Spawn a quick-exit child as a stand-in.
3464            if matches!(state.runtime, TaskRuntime::Piped(None)) {
3465                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3466            }
3467        }
3468        // Clear the terminal_at marker too so mark_terminal_now() can fire
3469        // again inside reap_child.
3470        *task.terminal_at.lock().unwrap() = None;
3471
3472        // Sanity: task is still Running per metadata (replay/poll hasn't
3473        // observed the missing marker yet).
3474        assert!(
3475            task.is_running(),
3476            "precondition: metadata.status == Running"
3477        );
3478        assert!(
3479            !task.paths.exit.exists(),
3480            "precondition: exit marker absent"
3481        );
3482
3483        // First watchdog observation is intentionally insufficient to
3484        // declare failure. A missing marker may just mean the wrapper is
3485        // still completing its tmp-file-to-marker rename, so reap_child only
3486        // drops the child handle and switches to detached PID monitoring.
3487        registry.reap_child(&task);
3488
3489        {
3490            let state = task.state.lock().unwrap();
3491            assert_eq!(
3492                state.metadata.status,
3493                BgTaskStatus::Running,
3494                "first reap must leave status Running while waiting one pass for marker"
3495            );
3496            assert_eq!(
3497                state.metadata.status_reason, None,
3498                "first reap must not record a failure reason"
3499            );
3500            assert!(
3501                matches!(state.runtime, TaskRuntime::Piped(None)),
3502                "child handle must be released after first reap"
3503            );
3504            assert!(
3505                state.detached,
3506                "task must be marked detached after first reap"
3507            );
3508        }
3509
3510        // Second watchdog observation sees the detached PID is dead and the
3511        // marker is still absent. That is strong enough evidence that the
3512        // wrapper exited without persisting an exit code.
3513        registry.reap_child(&task);
3514
3515        let state = task.state.lock().unwrap();
3516        assert!(
3517            state.metadata.status.is_terminal(),
3518            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3519            state.metadata.status
3520        );
3521        assert_eq!(
3522            state.metadata.status,
3523            BgTaskStatus::Failed,
3524            "must specifically be Failed (not Killed): status={:?}",
3525            state.metadata.status
3526        );
3527        assert_eq!(
3528            state.metadata.status_reason.as_deref(),
3529            Some("process exited without exit marker"),
3530            "reason must match replay path's wording: {:?}",
3531            state.metadata.status_reason
3532        );
3533        assert!(
3534            matches!(state.runtime, TaskRuntime::Piped(None)),
3535            "child handle must stay released after second reap"
3536        );
3537        assert!(
3538            state.detached,
3539            "task must remain detached after second reap"
3540        );
3541    }
3542
3543    /// Companion to the above: when the exit marker DOES exist on disk
3544    /// at reap_child time, reap_child must NOT mark the task Failed.
3545    /// Instead it leaves status=Running and lets the next poll_task()
3546    /// cycle finalize via the marker.
3547    #[test]
3548    fn reap_child_preserves_running_when_exit_marker_exists() {
3549        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3550        let dir = tempfile::tempdir().unwrap();
3551        let task_id = registry
3552            .spawn(
3553                QUICK_SUCCESS_COMMAND,
3554                "session".to_string(),
3555                dir.path().to_path_buf(),
3556                HashMap::new(),
3557                Some(Duration::from_secs(30)),
3558                dir.path().to_path_buf(),
3559                10,
3560                true,
3561                false,
3562                Some(dir.path().to_path_buf()),
3563            )
3564            .unwrap();
3565
3566        let task = registry.task_for_session(&task_id, "session").unwrap();
3567
3568        // Wait for child to exit AND for the marker to land. Both happen
3569        // shortly after the wrapper finishes — but we want both observed.
3570        let started = Instant::now();
3571        loop {
3572            let exited = {
3573                let mut state = task.state.lock().unwrap();
3574                match &mut state.runtime {
3575                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3576                    _ => true,
3577                }
3578            };
3579            if exited && task.paths.exit.exists() {
3580                break;
3581            }
3582            assert!(
3583                started.elapsed() < Duration::from_secs(5),
3584                "child should exit and write marker quickly"
3585            );
3586            std::thread::sleep(Duration::from_millis(20));
3587        }
3588
3589        // Stop the watchdog so it doesn't race with our manual reap_child.
3590        // On fast Windows runners the watchdog can call poll_task (which
3591        // finalizes via marker) before this test asserts the
3592        // "marker exists, status still Running" invariant. We want
3593        // exclusive control over the reap path.
3594        registry
3595            .inner
3596            .shutdown
3597            .store(true, std::sync::atomic::Ordering::SeqCst);
3598        std::thread::sleep(Duration::from_millis(550));
3599
3600        // If the watchdog already finalized the task before we stopped it,
3601        // restore the test setup: reset status to Running and ensure the
3602        // marker file is still on disk. We're testing reap_child's
3603        // behavior when called manually with both child-exited AND
3604        // marker-present, regardless of whether the watchdog beat us.
3605        {
3606            let mut state = task.state.lock().unwrap();
3607            state.metadata.status = BgTaskStatus::Running;
3608            state.metadata.status_reason = None;
3609            if matches!(state.runtime, TaskRuntime::Piped(None)) {
3610                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3611            }
3612        }
3613        *task.terminal_at.lock().unwrap() = None;
3614        // Make sure the marker is still on disk (poll_task removes it on
3615        // finalization). Recreate it if needed.
3616        if !task.paths.exit.exists() {
3617            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3618        }
3619
3620        // reap_child sees: child exited, marker exists. It should:
3621        //  - drop state.child / set state.detached = true
3622        //  - NOT change status (poll_task will finalize via marker next tick)
3623        registry.reap_child(&task);
3624
3625        let state = task.state.lock().unwrap();
3626        assert!(
3627            matches!(state.runtime, TaskRuntime::Piped(None)),
3628            "child handle still released even when marker exists"
3629        );
3630        assert!(
3631            state.detached,
3632            "task still marked detached even when marker exists"
3633        );
3634        // Status remains Running because reap_child defers to poll_task
3635        // when a marker exists. It would be wrong for reap to record the
3636        // marker outcome (poll_task does that with proper exit-code
3637        // parsing).
3638        assert_eq!(
3639            state.metadata.status,
3640            BgTaskStatus::Running,
3641            "reap_child must defer to poll_task when marker exists"
3642        );
3643    }
3644
3645    #[test]
3646    fn cleanup_finished_keeps_running_tasks() {
3647        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3648        let dir = tempfile::tempdir().unwrap();
3649        let task_id = registry
3650            .spawn(
3651                LONG_RUNNING_COMMAND,
3652                "session".to_string(),
3653                dir.path().to_path_buf(),
3654                HashMap::new(),
3655                Some(Duration::from_secs(30)),
3656                dir.path().to_path_buf(),
3657                10,
3658                true,
3659                false,
3660                Some(dir.path().to_path_buf()),
3661            )
3662            .unwrap();
3663
3664        registry.cleanup_finished(Duration::ZERO);
3665
3666        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3667        let _ = registry.kill(&task_id, "session");
3668    }
3669
3670    #[cfg(windows)]
3671    fn wait_for_file(path: &Path) -> String {
3672        let started = Instant::now();
3673        loop {
3674            if path.exists() {
3675                return fs::read_to_string(path).expect("read file");
3676            }
3677            assert!(
3678                started.elapsed() < Duration::from_secs(30),
3679                "timed out waiting for {}",
3680                path.display()
3681            );
3682            std::thread::sleep(Duration::from_millis(100));
3683        }
3684    }
3685
3686    #[cfg(windows)]
3687    fn spawn_windows_registry_command(
3688        command: &str,
3689    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
3690        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3691        let dir = tempfile::tempdir().unwrap();
3692        let task_id = registry
3693            .spawn(
3694                command,
3695                "session".to_string(),
3696                dir.path().to_path_buf(),
3697                HashMap::new(),
3698                Some(Duration::from_secs(30)),
3699                dir.path().to_path_buf(),
3700                10,
3701                false,
3702                false,
3703                Some(dir.path().to_path_buf()),
3704            )
3705            .unwrap();
3706        (registry, dir, task_id)
3707    }
3708
3709    #[cfg(windows)]
3710    #[test]
3711    fn windows_spawn_writes_exit_marker_for_zero_exit() {
3712        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
3713        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3714
3715        let content = wait_for_file(&exit_path);
3716
3717        assert_eq!(content.trim(), "0");
3718    }
3719
3720    #[cfg(windows)]
3721    #[test]
3722    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
3723        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
3724        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3725
3726        let content = wait_for_file(&exit_path);
3727
3728        assert_eq!(content.trim(), "42");
3729    }
3730
3731    #[cfg(windows)]
3732    #[test]
3733    fn windows_spawn_captures_stdout_to_disk() {
3734        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3735        let task = registry.task_for_session(&task_id, "session").unwrap();
3736        let stdout_path = task.paths.stdout.clone();
3737        let exit_path = task.paths.exit.clone();
3738
3739        let _ = wait_for_file(&exit_path);
3740        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3741
3742        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3743    }
3744
3745    #[cfg(windows)]
3746    #[test]
3747    fn windows_spawn_uses_pwsh_when_available() {
3748        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
3749        // (We intentionally pass None for shell_env to keep this test
3750        // independent of the runner's actual env.)
3751        let candidates = crate::windows_shell::shell_candidates_with(
3752            |binary| match binary {
3753                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3754                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3755                _ => None,
3756            },
3757            || None,
3758        );
3759        let shell = candidates.first().expect("at least one candidate").clone();
3760        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3761        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3762    }
3763
3764    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
3765    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
3766    /// evaluation is the default for batch files; parse-time expansion only
3767    /// applies to compound `&`-chained inline commands). Capturing
3768    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
3769    /// command runs records the real run-time exit code.
3770    #[cfg(windows)]
3771    #[test]
3772    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3773        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3774        let script =
3775            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3776
3777        // Batch wrapper: capture exit code into CODE on the line after the
3778        // user command, then write CODE to a temp marker file before
3779        // atomic-renaming it into place.
3780        assert!(
3781            script.contains("set CODE=%ERRORLEVEL%"),
3782            "wrapper must capture exit code into CODE: {script}"
3783        );
3784        assert!(
3785            script.contains("echo %CODE% >"),
3786            "wrapper must echo CODE to a temp marker file: {script}"
3787        );
3788        assert!(
3789            script.contains("move /Y"),
3790            "wrapper must use atomic move to write the marker: {script}"
3791        );
3792        // move output must be redirected to nul to avoid polluting the
3793        // user's captured stdout with "1 file(s) moved." lines.
3794        assert!(
3795            script.contains("> nul"),
3796            "wrapper must redirect move output to nul: {script}"
3797        );
3798        // exit /B %CODE% propagates the real exit code so wait() sees it.
3799        assert!(
3800            script.contains("exit /B %CODE%"),
3801            "wrapper must propagate the captured exit code: {script}"
3802        );
3803        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3804        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3805    }
3806
3807    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
3808    /// written to a `.bat` file where batch-line evaluation captures
3809    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
3810    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
3811    /// internal `"`-quoting from `cmd_quote`).
3812    #[cfg(windows)]
3813    #[test]
3814    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3815        use crate::windows_shell::WindowsShell;
3816        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3817        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3818        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3819        assert_eq!(
3820            args_strs,
3821            vec!["/D", "/S", "/C", "echo wrapped"],
3822            "Cmd::bg_command must prepend /D /S /C"
3823        );
3824    }
3825
3826    /// PowerShell variants don't need `/V:ON`-style flags; their args
3827    /// are the same for foreground (`command()`) and background
3828    /// (`bg_command()`).
3829    #[cfg(windows)]
3830    #[test]
3831    fn windows_shell_pwsh_bg_command_uses_standard_args() {
3832        use crate::windows_shell::WindowsShell;
3833        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3834        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3835        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3836        assert!(
3837            args_strs.contains(&"-Command"),
3838            "Pwsh::bg_command must use -Command: {args_strs:?}"
3839        );
3840        assert!(
3841            args_strs.contains(&"Get-Date"),
3842            "Pwsh::bg_command must include the user command body"
3843        );
3844    }
3845
3846    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
3847    /// **cmd.exe-specific** wrapper path captures the user command's
3848    /// run-time exit code correctly. The existing
3849    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
3850    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
3851    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
3852    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
3853    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
3854    ///
3855    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
3856    /// force the cmd-wrapper code path, then writes the exit marker and
3857    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
3858    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
3859    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
3860    /// regardless of what the user command returned.
3861    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
3862    /// the inline command-line wrapper helper that production code does
3863    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
3864    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
3865    /// produced silent failures on Windows 11 (move /Y could not parse
3866    /// path arguments through cmd's /C parser). The `bg_command` helper
3867    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
3868    /// the production spawn path goes through `detached_shell_command_for`
3869    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
3870    /// <bat-path>`.
3871    ///
3872    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
3873    /// covered live by the Windows e2e harness scenario 2d
3874    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
3875    /// exercises the real file-based wrapper end-to-end via the protocol.
3876    #[allow(dead_code)]
3877    #[cfg(any())] // disabled on all targets
3878    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3879}