Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use rusqlite::Connection;
12use serde::Serialize;
13
14use crate::context::SharedProgressSender;
15use crate::harness::Harness;
16use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
17
18#[cfg(unix)]
19use std::os::unix::process::CommandExt;
20#[cfg(windows)]
21use std::os::windows::process::CommandExt;
22
23use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
24use super::persistence::{
25    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
26    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
27    PersistedTask, TaskPaths,
28};
29use super::process::is_process_alive;
30#[cfg(unix)]
31use super::process::terminate_pgid;
32#[cfg(windows)]
33use super::process::terminate_pid;
34use super::{BgTaskInfo, BgTaskStatus};
35// Note: `resolve_windows_shell` is no longer imported at module scope —
36// production code in `spawn_detached_child` uses `shell_candidates()`
37// with retry instead, and the function remains in `windows_shell.rs`
38// for tests and as a future helper.
39
40/// Default timeout for background bash tasks: 30 minutes.
41/// Agents can override per-call via the `timeout` parameter (in ms).
42const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
43const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
44const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
45const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
46
47/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
48/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
49/// of typical command output (git status, test results, exit messages) — short
50/// enough that round-tripping multiple completions in one reminder stays well
51/// under the model's context budget but long enough that most successful runs
52/// don't need a follow-up `bash_status` call.
53const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
54const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct BgCompletion {
58    pub task_id: String,
59    /// Intentionally omitted from serialized completion payloads: push frames
60    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
61    #[serde(skip_serializing)]
62    pub session_id: String,
63    pub status: BgTaskStatus,
64    pub exit_code: Option<i32>,
65    pub command: String,
66    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
67    /// cached so push-frame consumers and `bash_drain_completions` callers see
68    /// the same preview without racing against later output rotation. Empty
69    /// when not captured (e.g., persisted task seen on startup before buffer
70    /// reattachment).
71    #[serde(default, skip_serializing_if = "String::is_empty")]
72    pub output_preview: String,
73    /// True when the captured tail is shorter than the actual output (because
74    /// rotation occurred or the output exceeds the preview cap). Plugins use
75    /// this to render a `…` prefix and signal that `bash_status` would return
76    /// more.
77    #[serde(default, skip_serializing_if = "is_false")]
78    pub output_truncated: bool,
79    /// Token count for raw stdout+stderr before compression. Omitted when any
80    /// stream exceeds the 128 KiB tokenization cap.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub original_tokens: Option<u32>,
83    /// Token count for the compressed output generated from the same capped
84    /// raw payload. Omitted when raw tokenization is skipped.
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub compressed_tokens: Option<u32>,
87    /// True when a stream exceeded the tokenization cap and counts are absent.
88    #[serde(default, skip_serializing_if = "is_false")]
89    pub tokens_skipped: bool,
90}
91
92fn is_false(v: &bool) -> bool {
93    !*v
94}
95
96#[derive(Debug, Clone, Serialize)]
97pub struct BgTaskSnapshot {
98    #[serde(flatten)]
99    pub info: BgTaskInfo,
100    pub exit_code: Option<i32>,
101    pub child_pid: Option<u32>,
102    pub workdir: String,
103    pub output_preview: String,
104    pub output_truncated: bool,
105    pub output_path: Option<String>,
106    pub stderr_path: Option<String>,
107}
108
109#[derive(Clone)]
110pub struct BgTaskRegistry {
111    pub(crate) inner: Arc<RegistryInner>,
112}
113
114pub(crate) struct RegistryInner {
115    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
116    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
117    pub(crate) progress_sender: SharedProgressSender,
118    watchdog_started: AtomicBool,
119    pub(crate) shutdown: AtomicBool,
120    pub(crate) long_running_reminder_enabled: AtomicBool,
121    pub(crate) long_running_reminder_interval_ms: AtomicU64,
122    persisted_gc_started: AtomicBool,
123    #[cfg(test)]
124    persisted_gc_runs: AtomicU64,
125    /// Output compression callback. Set by `AppContext` after construction.
126    /// Takes (command, raw_output) and returns compressed text. Called from
127    /// the watchdog thread when a task reaches a terminal state and from
128    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
129    /// uncompressed.
130    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
131    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
132    pub(crate) db_harness: RwLock<Option<String>>,
133}
134
135pub(crate) struct BgTask {
136    pub(crate) task_id: String,
137    pub(crate) session_id: String,
138    pub(crate) paths: TaskPaths,
139    pub(crate) started: Instant,
140    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
141    pub(crate) terminal_at: Mutex<Option<Instant>>,
142    pub(crate) state: Mutex<BgTaskState>,
143}
144
145pub(crate) struct BgTaskState {
146    pub(crate) metadata: PersistedTask,
147    pub(crate) child: Option<Child>,
148    pub(crate) detached: bool,
149    /// True once `reap_child` has observed the direct child handle's exit
150    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
151    /// `is_process_alive(child_pid)` probe on the second pass — we already
152    /// have authoritative evidence that the child is dead, no need to
153    /// re-verify via PID liveness which is unreliable on Windows where
154    /// PIDs can be recycled within seconds.
155    ///
156    /// Remains `false` on replay-restored tasks (those have a `child_pid`
157    /// but never observed exit via this process's `try_wait()`), so those
158    /// continue to fall through to the `is_process_alive` probe path.
159    pub(crate) child_exit_observed: bool,
160    pub(crate) buffer: BgBuffer,
161}
162
163impl BgTaskRegistry {
164    pub fn new(progress_sender: SharedProgressSender) -> Self {
165        Self {
166            inner: Arc::new(RegistryInner {
167                tasks: Mutex::new(HashMap::new()),
168                completions: Mutex::new(VecDeque::new()),
169                progress_sender,
170                watchdog_started: AtomicBool::new(false),
171                shutdown: AtomicBool::new(false),
172                long_running_reminder_enabled: AtomicBool::new(true),
173                long_running_reminder_interval_ms: AtomicU64::new(600_000),
174                persisted_gc_started: AtomicBool::new(false),
175                #[cfg(test)]
176                persisted_gc_runs: AtomicU64::new(0),
177                compressor: Mutex::new(None),
178                db_pool: RwLock::new(None),
179                db_harness: RwLock::new(None),
180            }),
181        }
182    }
183
184    pub fn set_harness(&self, harness: Harness) {
185        if let Ok(mut slot) = self.inner.db_harness.write() {
186            *slot = Some(harness.as_str().to_string());
187        }
188    }
189
190    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
191        if let Ok(mut slot) = self.inner.db_pool.write() {
192            *slot = Some(conn);
193        }
194    }
195
196    pub fn clear_db_pool(&self) {
197        if let Ok(mut slot) = self.inner.db_pool.write() {
198            *slot = None;
199        }
200    }
201
202    /// Install the output-compression callback. Called by `main.rs` after
203    /// `AppContext` is constructed so that snapshot/completion paths can
204    /// invoke `compress::compress_with_registry` without holding a context
205    /// reference. When called multiple times, the latest installation wins.
206    pub fn set_compressor<F>(&self, compressor: F)
207    where
208        F: Fn(&str, String) -> String + Send + Sync + 'static,
209    {
210        if let Ok(mut slot) = self.inner.compressor.lock() {
211            *slot = Some(Box::new(compressor));
212        }
213    }
214
215    /// Apply the installed compressor (if any) to `output`. Returns `output`
216    /// untouched when no compressor is installed.
217    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
218        let Ok(slot) = self.inner.compressor.lock() else {
219            return output;
220        };
221        match slot.as_ref() {
222            Some(compressor) => compressor(command, output),
223            None => output,
224        }
225    }
226
227    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
228        write_task(&paths.json, metadata)?;
229        self.dual_write_task(paths, metadata);
230        Ok(())
231    }
232
233    fn update_task_metadata<F>(
234        &self,
235        paths: &TaskPaths,
236        update: F,
237    ) -> std::io::Result<PersistedTask>
238    where
239        F: FnOnce(&mut PersistedTask),
240    {
241        let metadata = update_task(&paths.json, update)?;
242        self.dual_write_task(paths, &metadata);
243        Ok(metadata)
244    }
245
246    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
247        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
248        let Some(pool) = pool else {
249            return;
250        };
251        let harness = self
252            .inner
253            .db_harness
254            .read()
255            .ok()
256            .and_then(|slot| slot.clone());
257        let Some(harness) = harness else {
258            crate::slog_warn!(
259                "dual-write bash_task to DB skipped for {}: harness not configured",
260                metadata.task_id
261            );
262            return;
263        };
264        let row = match metadata.to_bash_task_row(&harness, paths) {
265            Ok(row) => row,
266            Err(error) => {
267                crate::slog_warn!(
268                    "dual-write bash_task to DB failed for {}: {}",
269                    metadata.task_id,
270                    error
271                );
272                return;
273            }
274        };
275        let conn = match pool.lock() {
276            Ok(conn) => conn,
277            Err(_) => {
278                crate::slog_warn!(
279                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
280                    metadata.task_id
281                );
282                return;
283            }
284        };
285        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
286            crate::slog_warn!(
287                "dual-write bash_task to DB failed for {}: {}",
288                metadata.task_id,
289                error
290            );
291        }
292    }
293
294    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
295        self.inner
296            .long_running_reminder_enabled
297            .store(enabled, Ordering::SeqCst);
298        self.inner
299            .long_running_reminder_interval_ms
300            .store(interval_ms, Ordering::SeqCst);
301    }
302
303    #[cfg(unix)]
304    #[allow(clippy::too_many_arguments)]
305    pub fn spawn(
306        &self,
307        command: &str,
308        session_id: String,
309        workdir: PathBuf,
310        env: HashMap<String, String>,
311        timeout: Option<Duration>,
312        storage_dir: PathBuf,
313        max_running: usize,
314        notify_on_completion: bool,
315        compressed: bool,
316        project_root: Option<PathBuf>,
317    ) -> Result<String, String> {
318        self.start_watchdog();
319
320        let running = self.running_count();
321        if running >= max_running {
322            return Err(format!(
323                "background bash task limit exceeded: {running} running (max {max_running})"
324            ));
325        }
326
327        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
328        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
329        let task_id = self.generate_unique_task_id()?;
330        let paths = task_paths(&storage_dir, &session_id, &task_id);
331        fs::create_dir_all(&paths.dir)
332            .map_err(|e| format!("failed to create background task dir: {e}"))?;
333
334        let mut metadata = PersistedTask::starting(
335            task_id.clone(),
336            session_id.clone(),
337            command.to_string(),
338            workdir.clone(),
339            project_root,
340            timeout_ms,
341            notify_on_completion,
342            compressed,
343        );
344        self.persist_task(&paths, &metadata)
345            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
346
347        // Pre-create capture files so the watchdog/buffer can always
348        // open them for reading. The spawn helper opens its own handles
349        // per attempt because each `Command::spawn()` consumes them.
350        create_capture_file(&paths.stdout)
351            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
352        create_capture_file(&paths.stderr)
353            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
354
355        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
356            Ok(child) => child,
357            Err(error) => {
358                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
359                let _ = delete_task_bundle(&paths);
360                return Err(error);
361            }
362        };
363
364        let child_pid = child.id();
365        metadata.mark_running(child_pid, child_pid as i32);
366        self.persist_task(&paths, &metadata)
367            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
368
369        let task = Arc::new(BgTask {
370            task_id: task_id.clone(),
371            session_id,
372            paths: paths.clone(),
373            started: Instant::now(),
374            last_reminder_at: Mutex::new(None),
375            terminal_at: Mutex::new(None),
376            state: Mutex::new(BgTaskState {
377                metadata,
378                child: Some(child),
379                detached: false,
380                child_exit_observed: false,
381                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
382            }),
383        });
384
385        self.inner
386            .tasks
387            .lock()
388            .map_err(|_| "background task registry lock poisoned".to_string())?
389            .insert(task_id.clone(), task);
390
391        Ok(task_id)
392    }
393
394    #[cfg(windows)]
395    #[allow(clippy::too_many_arguments)]
396    pub fn spawn(
397        &self,
398        command: &str,
399        session_id: String,
400        workdir: PathBuf,
401        env: HashMap<String, String>,
402        timeout: Option<Duration>,
403        storage_dir: PathBuf,
404        max_running: usize,
405        notify_on_completion: bool,
406        compressed: bool,
407        project_root: Option<PathBuf>,
408    ) -> Result<String, String> {
409        self.start_watchdog();
410
411        let running = self.running_count();
412        if running >= max_running {
413            return Err(format!(
414                "background bash task limit exceeded: {running} running (max {max_running})"
415            ));
416        }
417
418        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
419        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
420        let task_id = self.generate_unique_task_id()?;
421        let paths = task_paths(&storage_dir, &session_id, &task_id);
422        fs::create_dir_all(&paths.dir)
423            .map_err(|e| format!("failed to create background task dir: {e}"))?;
424
425        let mut metadata = PersistedTask::starting(
426            task_id.clone(),
427            session_id.clone(),
428            command.to_string(),
429            workdir.clone(),
430            project_root,
431            timeout_ms,
432            notify_on_completion,
433            compressed,
434        );
435        self.persist_task(&paths, &metadata)
436            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
437
438        // Capture files are pre-created so the watchdog/buffer can always
439        // open them for reading even if the child hasn't written anything
440        // yet. The spawn helper opens its own handles per attempt because
441        // each `Command::spawn()` consumes them, and on Windows we may
442        // retry across multiple shell candidates if the first one fails.
443        create_capture_file(&paths.stdout)
444            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
445        create_capture_file(&paths.stderr)
446            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
447
448        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
449            Ok(child) => child,
450            Err(error) => {
451                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
452                let _ = delete_task_bundle(&paths);
453                return Err(error);
454            }
455        };
456
457        let child_pid = child.id();
458        metadata.status = BgTaskStatus::Running;
459        metadata.child_pid = Some(child_pid);
460        metadata.pgid = None;
461        self.persist_task(&paths, &metadata)
462            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
463
464        let task = Arc::new(BgTask {
465            task_id: task_id.clone(),
466            session_id,
467            paths: paths.clone(),
468            started: Instant::now(),
469            last_reminder_at: Mutex::new(None),
470            terminal_at: Mutex::new(None),
471            state: Mutex::new(BgTaskState {
472                metadata,
473                child: Some(child),
474                detached: false,
475                child_exit_observed: false,
476                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
477            }),
478        });
479
480        self.inner
481            .tasks
482            .lock()
483            .map_err(|_| "background task registry lock poisoned".to_string())?
484            .insert(task_id.clone(), task);
485
486        Ok(task_id)
487    }
488
489    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
490        self.replay_session_inner(storage_dir, session_id, None)
491    }
492
493    pub fn replay_session_for_project(
494        &self,
495        storage_dir: &Path,
496        session_id: &str,
497        project_root: &Path,
498    ) -> Result<(), String> {
499        self.replay_session_inner(storage_dir, session_id, Some(project_root))
500    }
501
502    fn replay_session_inner(
503        &self,
504        storage_dir: &Path,
505        session_id: &str,
506        project_root: Option<&Path>,
507    ) -> Result<(), String> {
508        self.start_watchdog();
509        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
510            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
511                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
512            }
513        }
514
515        let canonical_project = project_root.map(canonicalized_path);
516        // Replay strategy: DB is the post-v0.27 source of truth. Disk
517        // fallback handles pre-v0.27 tasks that haven't been migrated and
518        // the cold-start `__default__` namespace (configure runs before any
519        // user session exists, so plugin-init triggers a session-less DB
520        // lookup that will be empty until a real session writes a task).
521        //
522        // We deliberately keep the empty-DB / empty-disk path silent — it's
523        // the normal startup case and would otherwise fire on every configure
524        // (see GitHub user report against v0.27.0). INFO-level logs only when
525        // disk actually returned tasks (real migration signal); WARN when the
526        // DB lookup itself errored.
527        let tasks = match self.replay_session_from_db(session_id) {
528            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
529            Some(Ok(_)) => {
530                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
531                if !disk_tasks.is_empty() {
532                    crate::slog_info!(
533                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
534                        session_id,
535                        disk_tasks.len()
536                    );
537                }
538                disk_tasks
539            }
540            Some(Err(error)) => {
541                crate::slog_warn!(
542                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
543                    session_id,
544                    error
545                );
546                self.replay_session_from_disk(storage_dir, session_id)?
547            }
548            None => {
549                // DB pool unconfigured — common in tests + before harness is set.
550                self.replay_session_from_disk(storage_dir, session_id)?
551            }
552        };
553
554        for mut metadata in tasks {
555            if metadata.session_id != session_id {
556                continue;
557            }
558            if let Some(canonical_project) = canonical_project.as_deref() {
559                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
560                if metadata_project.as_deref() != Some(canonical_project) {
561                    continue;
562                }
563            }
564
565            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
566            match metadata.status {
567                BgTaskStatus::Starting => {
568                    metadata.mark_terminal(
569                        BgTaskStatus::Failed,
570                        None,
571                        Some("spawn aborted".to_string()),
572                    );
573                    let _ = self.persist_task(&paths, &metadata);
574                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
575                    self.insert_rehydrated_task(metadata, paths, true)?;
576                }
577                BgTaskStatus::Running | BgTaskStatus::Killing => {
578                    if self.running_metadata_is_stale(&metadata) {
579                        metadata.mark_terminal(
580                            BgTaskStatus::Killed,
581                            None,
582                            Some("orphaned (>24h)".to_string()),
583                        );
584                        if !paths.exit.exists() {
585                            let _ = write_kill_marker_if_absent(&paths.exit);
586                        }
587                        let _ = self.persist_task(&paths, &metadata);
588                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
589                        self.insert_rehydrated_task(metadata, paths, true)?;
590                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
591                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
592                            "recovered from inconsistent killing state on replay".to_string()
593                        });
594                        if reason.is_some() {
595                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
596                            metadata.task_id);
597                        }
598                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
599                        let _ = self.persist_task(&paths, &metadata);
600                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
601                        self.insert_rehydrated_task(metadata, paths, true)?;
602                    } else if metadata.status == BgTaskStatus::Killing {
603                        if !paths.exit.exists() {
604                            let _ = write_kill_marker_if_absent(&paths.exit);
605                        }
606                        metadata.mark_terminal(
607                            BgTaskStatus::Killed,
608                            None,
609                            Some("recovered from inconsistent killing state on replay".to_string()),
610                        );
611                        let _ = self.persist_task(&paths, &metadata);
612                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
613                        self.insert_rehydrated_task(metadata, paths, true)?;
614                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
615                        metadata.mark_terminal(
616                            BgTaskStatus::Failed,
617                            None,
618                            Some("process exited without exit marker".to_string()),
619                        );
620                        let _ = self.persist_task(&paths, &metadata);
621                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
622                        self.insert_rehydrated_task(metadata, paths, true)?;
623                    } else {
624                        self.insert_rehydrated_task(metadata, paths, true)?;
625                    }
626                }
627                _ if metadata.status.is_terminal() => {
628                    // Borrow `paths` for the completion enqueue BEFORE
629                    // `insert_rehydrated_task` consumes it. The completion
630                    // helper only reads from `paths` (stdout/stderr/exit) to
631                    // reconstruct a tail preview, so it must see the same
632                    // paths the rehydrated task will own.
633                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
634                    self.insert_rehydrated_task(metadata, paths, true)?;
635                }
636                _ => {}
637            }
638        }
639
640        Ok(())
641    }
642
643    fn replay_session_from_db(
644        &self,
645        session_id: &str,
646    ) -> Option<Result<Vec<PersistedTask>, String>> {
647        let pool = self
648            .inner
649            .db_pool
650            .read()
651            .ok()
652            .and_then(|slot| slot.clone())?;
653        let harness = self
654            .inner
655            .db_harness
656            .read()
657            .ok()
658            .and_then(|slot| slot.clone())?;
659        let conn = match pool.lock() {
660            Ok(conn) => conn,
661            Err(_) => return Some(Err("db mutex poisoned".to_string())),
662        };
663        Some(
664            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
665                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
666                .map_err(|error| error.to_string()),
667        )
668    }
669
670    fn replay_session_from_disk(
671        &self,
672        storage_dir: &Path,
673        session_id: &str,
674    ) -> Result<Vec<PersistedTask>, String> {
675        let dir = session_tasks_dir(storage_dir, session_id);
676        if !dir.exists() {
677            return Ok(Vec::new());
678        }
679
680        let entries = fs::read_dir(&dir)
681            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
682        let mut tasks = Vec::new();
683        for entry in entries.flatten() {
684            let path = entry.path();
685            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
686                continue;
687            }
688            match read_task(&path) {
689                Ok(metadata) => tasks.push(metadata),
690                Err(error) => {
691                    crate::slog_warn!(
692                        "quarantining invalid background task metadata {} during replay: {error}",
693                        path.display()
694                    );
695                    if let Err(quarantine_error) =
696                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
697                    {
698                        crate::slog_warn!(
699                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
700                            path.display()
701                        );
702                    }
703                }
704            }
705        }
706        Ok(tasks)
707    }
708
709    pub fn status(
710        &self,
711        task_id: &str,
712        session_id: &str,
713        project_root: Option<&Path>,
714        storage_dir: Option<&Path>,
715        preview_bytes: usize,
716    ) -> Option<BgTaskSnapshot> {
717        let mut task = self.task_for_session(task_id, session_id);
718        if task.is_none() {
719            if let Some(storage_dir) = storage_dir {
720                let _ = self.replay_session(storage_dir, session_id);
721                task = self.task_for_session(task_id, session_id);
722            }
723        }
724        let Some(task) = task else {
725            return self.status_relaxed(
726                task_id,
727                session_id,
728                project_root?,
729                storage_dir?,
730                preview_bytes,
731            );
732        };
733        let _ = self.poll_task(&task);
734        let mut snapshot = task.snapshot(preview_bytes);
735        self.maybe_compress_snapshot(&task, &mut snapshot);
736        Some(snapshot)
737    }
738
739    fn status_relaxed_task(
740        &self,
741        task_id: &str,
742        project_root: &Path,
743        storage_dir: &Path,
744    ) -> Option<Arc<BgTask>> {
745        let canonical_project = canonicalized_path(project_root);
746        match self.lookup_relaxed_task_from_db(task_id, project_root) {
747            Some(Ok(Some(metadata))) => {
748                if let Some(task) = self.task(task_id) {
749                    let matches_project = task
750                        .state
751                        .lock()
752                        .map(|state| {
753                            state
754                                .metadata
755                                .project_root
756                                .as_deref()
757                                .map(canonicalized_path)
758                                .as_deref()
759                                == Some(canonical_project.as_path())
760                        })
761                        .unwrap_or(false);
762                    return matches_project.then_some(task);
763                }
764                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
765                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
766                    return None;
767                }
768                return self.task(task_id);
769            }
770            Some(Ok(None)) => {
771                crate::slog_info!(
772                    "bash task relaxed DB miss for {}; falling back to disk",
773                    task_id
774                );
775            }
776            Some(Err(error)) => {
777                crate::slog_warn!(
778                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
779                    task_id,
780                    error
781                );
782            }
783            None => {
784                crate::slog_info!(
785                    "bash task relaxed DB unavailable for {}; falling back to disk",
786                    task_id
787                );
788            }
789        }
790        let root = storage_dir.join("bash-tasks");
791        let entries = fs::read_dir(&root).ok()?;
792        for entry in entries.flatten() {
793            let dir = entry.path();
794            if !dir.is_dir() {
795                continue;
796            }
797            let path = dir.join(format!("{task_id}.json"));
798            if !path.exists() {
799                continue;
800            }
801            let metadata = match read_task(&path) {
802                Ok(metadata) => metadata,
803                Err(error) => {
804                    crate::slog_warn!(
805                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
806                        path.display()
807                    );
808                    if let Err(quarantine_error) =
809                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
810                    {
811                        crate::slog_warn!(
812                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
813                            path.display()
814                        );
815                    }
816                    continue;
817                }
818            };
819            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
820            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
821                continue;
822            }
823            if let Some(task) = self.task(task_id) {
824                let matches_project = task
825                    .state
826                    .lock()
827                    .map(|state| {
828                        state
829                            .metadata
830                            .project_root
831                            .as_deref()
832                            .map(canonicalized_path)
833                            .as_deref()
834                            == Some(canonical_project.as_path())
835                    })
836                    .unwrap_or(false);
837                return matches_project.then_some(task);
838            }
839            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
840            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
841                return None;
842            }
843            return self.task(task_id);
844        }
845        None
846    }
847
848    fn lookup_relaxed_task_from_db(
849        &self,
850        task_id: &str,
851        project_root: &Path,
852    ) -> Option<Result<Option<PersistedTask>, String>> {
853        let pool = self
854            .inner
855            .db_pool
856            .read()
857            .ok()
858            .and_then(|slot| slot.clone())?;
859        let harness = self
860            .inner
861            .db_harness
862            .read()
863            .ok()
864            .and_then(|slot| slot.clone())?;
865        let conn = match pool.lock() {
866            Ok(conn) => conn,
867            Err(_) => return Some(Err("db mutex poisoned".to_string())),
868        };
869        let project_key = crate::search_index::project_cache_key(project_root);
870        Some(
871            crate::db::bash_tasks::find_bash_task_for_project(
872                &conn,
873                &harness,
874                &project_key,
875                task_id,
876            )
877            .map(|row| row.map(PersistedTask::from))
878            .map_err(|error| error.to_string()),
879        )
880    }
881
882    pub(super) fn status_relaxed(
883        &self,
884        task_id: &str,
885        _session_id: &str,
886        project_root: &Path,
887        storage_dir: &Path,
888        preview_bytes: usize,
889    ) -> Option<BgTaskSnapshot> {
890        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
891        let _ = self.poll_task(&task);
892        let mut snapshot = task.snapshot(preview_bytes);
893        self.maybe_compress_snapshot(&task, &mut snapshot);
894        Some(snapshot)
895    }
896
897    pub fn kill_relaxed(
898        &self,
899        task_id: &str,
900        project_root: &Path,
901        storage_dir: &Path,
902    ) -> Result<BgTaskSnapshot, String> {
903        let task = self
904            .status_relaxed_task(task_id, project_root, storage_dir)
905            .ok_or_else(|| format!("background task not found: {task_id}"))?;
906        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
907    }
908
909    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
910        #[cfg(test)]
911        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
912
913        let mut deleted = 0usize;
914
915        let root = storage_dir.join("bash-tasks");
916        if root.exists() {
917            let session_dirs = fs::read_dir(&root).map_err(|e| {
918                format!(
919                    "failed to read background task root {}: {e}",
920                    root.display()
921                )
922            })?;
923            for session_entry in session_dirs.flatten() {
924                let session_dir = session_entry.path();
925                if !session_dir.is_dir() {
926                    continue;
927                }
928                let task_entries = match fs::read_dir(&session_dir) {
929                    Ok(entries) => entries,
930                    Err(error) => {
931                        crate::slog_warn!(
932                            "failed to read background task session dir {}: {error}",
933                            session_dir.display()
934                        );
935                        continue;
936                    }
937                };
938                for task_entry in task_entries.flatten() {
939                    let json_path = task_entry.path();
940                    if json_path
941                        .extension()
942                        .and_then(|extension| extension.to_str())
943                        != Some("json")
944                    {
945                        continue;
946                    }
947                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
948                        continue;
949                    }
950                    let metadata = match read_task(&json_path) {
951                        Ok(metadata) => metadata,
952                        Err(error) => {
953                            crate::slog_warn!(
954                                "quarantining corrupt background task metadata {}: {error}",
955                                json_path.display()
956                            );
957                            quarantine_task_json(
958                                storage_dir,
959                                &session_dir,
960                                &json_path,
961                                QuarantineKind::Corrupt,
962                            )?;
963                            continue;
964                        }
965                    };
966                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
967                        continue;
968                    }
969                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
970                    match delete_task_bundle(&paths) {
971                        Ok(()) => {
972                            deleted += 1;
973                            log::debug!(
974                                "deleted persisted background task bundle {}",
975                                metadata.task_id
976                            );
977                        }
978                        Err(error) => {
979                            crate::slog_warn!(
980                                "failed to delete background task bundle {}: {error}",
981                                metadata.task_id
982                            );
983                            continue;
984                        }
985                    }
986                }
987            }
988        }
989        gc_quarantine(storage_dir);
990        Ok(deleted)
991    }
992
993    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
994        let tasks = self
995            .inner
996            .tasks
997            .lock()
998            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
999            .unwrap_or_default();
1000        tasks
1001            .into_iter()
1002            .map(|task| {
1003                let _ = self.poll_task(&task);
1004                let mut snapshot = task.snapshot(preview_bytes);
1005                self.maybe_compress_snapshot(&task, &mut snapshot);
1006                snapshot
1007            })
1008            .collect()
1009    }
1010
1011    /// Compress `output_preview` in place when the task is in a terminal
1012    /// state. Live tail of running tasks stays raw so agents debugging
1013    /// long-running bash see exactly what the process emitted, not a
1014    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
1015    /// field on `PersistedTask` short-circuits before the compress pipeline.
1016    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1017        if !snapshot.info.status.is_terminal() {
1018            return;
1019        }
1020        let compressed_flag = task
1021            .state
1022            .lock()
1023            .map(|state| state.metadata.compressed)
1024            .unwrap_or(true);
1025        if !compressed_flag {
1026            return;
1027        }
1028        let raw = std::mem::take(&mut snapshot.output_preview);
1029        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1030    }
1031
1032    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1033        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1034    }
1035
1036    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1037        let task = self
1038            .task_for_session(task_id, session_id)
1039            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1040        let mut state = task
1041            .state
1042            .lock()
1043            .map_err(|_| "background task lock poisoned".to_string())?;
1044        let updated = self
1045            .update_task_metadata(&task.paths, |metadata| {
1046                metadata.notify_on_completion = true;
1047                metadata.completion_delivered = false;
1048            })
1049            .map_err(|e| format!("failed to promote background task: {e}"))?;
1050        state.metadata = updated;
1051        if state.metadata.status.is_terminal() {
1052            state.buffer.enforce_terminal_cap();
1053            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1054        }
1055        Ok(true)
1056    }
1057
1058    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1059        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1060            .map(|_| ())
1061    }
1062
1063    pub fn cleanup_finished(&self, older_than: Duration) {
1064        let cutoff = Instant::now().checked_sub(older_than);
1065        let removable_paths: Vec<(String, TaskPaths)> =
1066            if let Ok(mut tasks) = self.inner.tasks.lock() {
1067                let removable = tasks
1068                    .iter()
1069                    .filter_map(|(task_id, task)| {
1070                        let delivered_terminal = task
1071                            .state
1072                            .lock()
1073                            .map(|state| {
1074                                state.metadata.status.is_terminal()
1075                                    && state.metadata.completion_delivered
1076                            })
1077                            .unwrap_or(false);
1078                        if !delivered_terminal {
1079                            return None;
1080                        }
1081
1082                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1083                        let expired = match (terminal_at, cutoff) {
1084                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1085                            (Some(_), None) => true,
1086                            (None, _) => false,
1087                        };
1088                        expired.then(|| task_id.clone())
1089                    })
1090                    .collect::<Vec<_>>();
1091
1092                removable
1093                    .into_iter()
1094                    .filter_map(|task_id| {
1095                        tasks
1096                            .remove(&task_id)
1097                            .map(|task| (task_id, task.paths.clone()))
1098                    })
1099                    .collect()
1100            } else {
1101                Vec::new()
1102            };
1103
1104        for (task_id, paths) in removable_paths {
1105            match delete_task_bundle(&paths) {
1106                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1107                Err(error) => crate::slog_warn!(
1108                    "failed to delete persisted background task bundle {task_id}: {error}"
1109                ),
1110            }
1111        }
1112    }
1113
1114    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1115        self.drain_completions_for_session(None)
1116    }
1117
1118    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1119        let completions = match self.inner.completions.lock() {
1120            Ok(completions) => completions,
1121            Err(_) => return Vec::new(),
1122        };
1123
1124        completions
1125            .iter()
1126            .filter(|completion| {
1127                session_id
1128                    .map(|session_id| completion.session_id == session_id)
1129                    .unwrap_or(true)
1130            })
1131            .cloned()
1132            .collect()
1133    }
1134
1135    pub fn ack_completions_for_session(
1136        &self,
1137        session_id: Option<&str>,
1138        task_ids: &[String],
1139    ) -> Vec<String> {
1140        if task_ids.is_empty() {
1141            return Vec::new();
1142        }
1143        let task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1144        let mut completions = match self.inner.completions.lock() {
1145            Ok(completions) => completions,
1146            Err(_) => return Vec::new(),
1147        };
1148        let mut acked = Vec::new();
1149        completions.retain(|completion| {
1150            let session_matches = session_id
1151                .map(|session_id| completion.session_id == session_id)
1152                .unwrap_or(true);
1153            if session_matches && task_ids.contains(completion.task_id.as_str()) {
1154                acked.push((completion.task_id.clone(), completion.session_id.clone()));
1155                false
1156            } else {
1157                true
1158            }
1159        });
1160        drop(completions);
1161
1162        let mut delivered = Vec::new();
1163        for (task_id, completion_session_id) in acked {
1164            if let Some(task) = self.task_for_session(&task_id, &completion_session_id) {
1165                if task.set_completion_delivered(true, self).is_ok() {
1166                    delivered.push(task_id);
1167                }
1168            }
1169        }
1170
1171        delivered
1172    }
1173
1174    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1175        self.inner
1176            .completions
1177            .lock()
1178            .map(|completions| {
1179                completions
1180                    .iter()
1181                    .filter(|completion| completion.session_id == session_id)
1182                    .cloned()
1183                    .collect()
1184            })
1185            .unwrap_or_default()
1186    }
1187
1188    pub fn detach(&self) {
1189        self.inner.shutdown.store(true, Ordering::SeqCst);
1190        if let Ok(mut tasks) = self.inner.tasks.lock() {
1191            for task in tasks.values() {
1192                if let Ok(mut state) = task.state.lock() {
1193                    state.child = None;
1194                    state.detached = true;
1195                }
1196            }
1197            tasks.clear();
1198        }
1199    }
1200
1201    pub fn shutdown(&self) {
1202        let tasks = self
1203            .inner
1204            .tasks
1205            .lock()
1206            .map(|tasks| {
1207                tasks
1208                    .values()
1209                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1210                    .collect::<Vec<_>>()
1211            })
1212            .unwrap_or_default();
1213        for (task_id, session_id) in tasks {
1214            let _ = self.kill(&task_id, &session_id);
1215        }
1216    }
1217
1218    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1219        let marker = match read_exit_marker(&task.paths.exit) {
1220            Ok(Some(marker)) => marker,
1221            Ok(None) => return Ok(()),
1222            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1223        };
1224        self.finalize_from_marker(task, marker, None)
1225    }
1226
1227    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1228        let Ok(mut state) = task.state.lock() else {
1229            return;
1230        };
1231        if let Some(child) = state.child.as_mut() {
1232            if matches!(child.try_wait(), Ok(Some(_))) {
1233                // First pass: the direct child has exited, but a missing exit
1234                // marker is not yet proof that the wrapper crashed. Under
1235                // load, the wrapper can be between writing `*.exit.tmp.$$` and
1236                // renaming it to `*.exit`, or the directory entry may not be
1237                // visible to this watchdog pass yet.
1238                //
1239                // Drop the handle and mark the task detached so a later pass
1240                // declares failure if the marker is still absent. Record
1241                // `child_exit_observed=true` so that later pass can trust
1242                // this `try_wait()` evidence directly — Windows PIDs get
1243                // recycled fast enough that `is_process_alive(child_pid)`
1244                // can return true for an unrelated process by then.
1245                state.child = None;
1246                state.detached = true;
1247                state.child_exit_observed = true;
1248            }
1249        } else if state.detached {
1250            // Second pass. The child handle is gone and we're monitoring by
1251            // PID. Two cases produce this shape:
1252            //
1253            //   1. First reap of this process observed `try_wait() == Some`
1254            //      and set `child_exit_observed=true`. We already know the
1255            //      child is dead — no need to re-verify via PID liveness
1256            //      (unreliable on Windows due to PID recycling).
1257            //
1258            //   2. The task was restored from disk by replay logic — we
1259            //      have `child_pid` from a previous AFT process but never
1260            //      observed exit in this one. Probe `is_process_alive` to
1261            //      confirm dead-ness before declaring failure.
1262            let child_known_dead = state.child_exit_observed
1263                || state
1264                    .metadata
1265                    .child_pid
1266                    .is_some_and(|pid| !is_process_alive(pid));
1267            if child_known_dead {
1268                self.fail_without_exit_marker_if_needed(task, &mut state);
1269            }
1270        }
1271    }
1272
1273    fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1274        if state.metadata.status.is_terminal() {
1275            return;
1276        }
1277        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1278            return;
1279        }
1280        let updated = self.update_task_metadata(&task.paths, |metadata| {
1281            metadata.mark_terminal(
1282                BgTaskStatus::Failed,
1283                None,
1284                Some("process exited without exit marker".to_string()),
1285            );
1286        });
1287        if let Ok(metadata) = updated {
1288            state.metadata = metadata;
1289            task.mark_terminal_now();
1290            state.buffer.enforce_terminal_cap();
1291            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1292        }
1293    }
1294
1295    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1296        self.inner
1297            .tasks
1298            .lock()
1299            .map(|tasks| {
1300                tasks
1301                    .values()
1302                    .filter(|task| task.is_running())
1303                    .cloned()
1304                    .collect()
1305            })
1306            .unwrap_or_default()
1307    }
1308
1309    fn insert_rehydrated_task(
1310        &self,
1311        metadata: PersistedTask,
1312        paths: TaskPaths,
1313        detached: bool,
1314    ) -> Result<(), String> {
1315        let task_id = metadata.task_id.clone();
1316        let session_id = metadata.session_id.clone();
1317        let started = started_instant_from_unix_millis(metadata.started_at);
1318        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1319        let task = Arc::new(BgTask {
1320            task_id: task_id.clone(),
1321            session_id,
1322            paths: paths.clone(),
1323            started,
1324            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1325            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1326            state: Mutex::new(BgTaskState {
1327                metadata,
1328                child: None,
1329                detached,
1330                // Replay path: we never observed the child handle's exit
1331                // in this process (the previous AFT process did, but its
1332                // observation didn't survive restart). Leave this false so
1333                // the second-pass reap falls through to the
1334                // `is_process_alive(child_pid)` probe rather than declaring
1335                // failure based on stale evidence.
1336                child_exit_observed: false,
1337                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
1338            }),
1339        });
1340        self.inner
1341            .tasks
1342            .lock()
1343            .map_err(|_| "background task registry lock poisoned".to_string())?
1344            .insert(task_id, task);
1345        Ok(())
1346    }
1347
1348    fn kill_with_status(
1349        &self,
1350        task_id: &str,
1351        session_id: &str,
1352        terminal_status: BgTaskStatus,
1353    ) -> Result<BgTaskSnapshot, String> {
1354        let task = self
1355            .task_for_session(task_id, session_id)
1356            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1357
1358        {
1359            let mut state = task
1360                .state
1361                .lock()
1362                .map_err(|_| "background task lock poisoned".to_string())?;
1363            if state.metadata.status.is_terminal() {
1364                return Ok(task.snapshot_locked(&state, 5 * 1024));
1365            }
1366
1367            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1368                state.metadata =
1369                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1370                task.mark_terminal_now();
1371                state.child = None;
1372                state.detached = true;
1373                state.buffer.enforce_terminal_cap();
1374                self.persist_task(&task.paths, &state.metadata)
1375                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1376                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1377                return Ok(task.snapshot_locked(&state, 5 * 1024));
1378            }
1379
1380            state.metadata.status = BgTaskStatus::Killing;
1381            self.persist_task(&task.paths, &state.metadata)
1382                .map_err(|e| format!("failed to persist killing state: {e}"))?;
1383
1384            #[cfg(unix)]
1385            if let Some(pgid) = state.metadata.pgid {
1386                terminate_pgid(pgid, state.child.as_mut());
1387            }
1388            #[cfg(windows)]
1389            if let Some(child) = state.child.as_mut() {
1390                super::process::terminate_process(child);
1391            } else if let Some(pid) = state.metadata.child_pid {
1392                terminate_pid(pid);
1393            }
1394            if let Some(child) = state.child.as_mut() {
1395                let _ = child.wait();
1396            }
1397            state.child = None;
1398            state.detached = true;
1399
1400            if !task.paths.exit.exists() {
1401                write_kill_marker_if_absent(&task.paths.exit)
1402                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
1403            }
1404
1405            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1406                Some(124)
1407            } else {
1408                None
1409            };
1410            state
1411                .metadata
1412                .mark_terminal(terminal_status, exit_code, None);
1413            task.mark_terminal_now();
1414            self.persist_task(&task.paths, &state.metadata)
1415                .map_err(|e| format!("failed to persist killed state: {e}"))?;
1416            state.buffer.enforce_terminal_cap();
1417            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1418        }
1419
1420        Ok(task.snapshot(5 * 1024))
1421    }
1422
1423    fn finalize_from_marker(
1424        &self,
1425        task: &Arc<BgTask>,
1426        marker: ExitMarker,
1427        reason: Option<String>,
1428    ) -> Result<(), String> {
1429        let mut state = task
1430            .state
1431            .lock()
1432            .map_err(|_| "background task lock poisoned".to_string())?;
1433        if state.metadata.status.is_terminal() {
1434            return Ok(());
1435        }
1436
1437        let updated = self
1438            .update_task_metadata(&task.paths, |metadata| {
1439                let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1440                *metadata = new_metadata;
1441            })
1442            .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1443        state.metadata = updated;
1444        task.mark_terminal_now();
1445        state.child = None;
1446        state.detached = true;
1447        state.buffer.enforce_terminal_cap();
1448        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1449        Ok(())
1450    }
1451
1452    fn enqueue_completion_if_needed(
1453        &self,
1454        metadata: &PersistedTask,
1455        paths: Option<&TaskPaths>,
1456        emit_frame: bool,
1457    ) {
1458        if metadata.status.is_terminal() && !metadata.completion_delivered {
1459            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1460        }
1461    }
1462
1463    fn enqueue_completion_locked(
1464        &self,
1465        metadata: &PersistedTask,
1466        buffer: Option<&BgBuffer>,
1467        emit_frame: bool,
1468    ) {
1469        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1470    }
1471
1472    fn enqueue_completion_from_parts(
1473        &self,
1474        metadata: &PersistedTask,
1475        buffer: Option<&BgBuffer>,
1476        paths: Option<&TaskPaths>,
1477        emit_frame: bool,
1478    ) {
1479        // Only the terminal-state guard prevents double-recording here. The
1480        // `completion_delivered` flag is NOT used to gate compression-event
1481        // recording, because `mark_terminal` flips `completion_delivered=true`
1482        // immediately for tasks with `notify_on_completion=false` (foreground
1483        // bash polled via `bash_status`, which is the common case). Pre-emptive
1484        // delivery flagging is correct for the push-frame queue (suppresses
1485        // duplicate user-visible notifications) but would silently skip the
1486        // database insert below. Compression event recording is idempotent at
1487        // the DB layer (unique on harness+session+task_id), so re-entry is
1488        // safe; the dedupe-by-queue check stays for the push frame side.
1489        if !metadata.status.is_terminal() {
1490            return;
1491        }
1492        // Read tail once at completion time and cache on the BgCompletion so
1493        // both the push-frame consumer (running session) and any later
1494        // `bash_drain_completions` poll (different session, restart) see the
1495        // same preview without racing against rotation.
1496        let (raw_preview, output_truncated) = match buffer {
1497            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1498            None => paths
1499                .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1500                .unwrap_or_else(|| (String::new(), false)),
1501        };
1502        // Compress at completion time so push-frame consumers and later
1503        // `bash_drain_completions` poll-callers see the same compressed text.
1504        // Per-task `compressed: false` opts out; otherwise the compressor is
1505        // a no-op when `experimental.bash.compress=false`.
1506        let output_preview = if metadata.compressed {
1507            self.compress_output(&metadata.command, raw_preview)
1508        } else {
1509            raw_preview
1510        };
1511        let token_counts = self.completion_token_counts(metadata, buffer, paths);
1512        let completion = BgCompletion {
1513            task_id: metadata.task_id.clone(),
1514            session_id: metadata.session_id.clone(),
1515            status: metadata.status.clone(),
1516            exit_code: metadata.exit_code,
1517            command: metadata.command.clone(),
1518            output_preview,
1519            output_truncated,
1520            original_tokens: token_counts.original_tokens,
1521            compressed_tokens: token_counts.compressed_tokens,
1522            tokens_skipped: token_counts.tokens_skipped,
1523        };
1524
1525        // Record the compression event BEFORE the push-frame dedupe. Event
1526        // recording has its own idempotency at the DB layer (unique key on
1527        // harness+session+task_id), so it's safe to attempt for every
1528        // terminal-state finalize. Critically, this path runs even when
1529        // `completion_delivered=true` was pre-set by `mark_terminal` for
1530        // foreground bash (`notify_on_completion=false`) — which is the common
1531        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
1532        // after the dedupe guard and never fired for foreground tasks, which
1533        // meant compression accounting was effectively dead for >99% of
1534        // real-world bash usage.
1535        self.record_compression_event_if_applicable(metadata, &token_counts);
1536
1537        // Push-frame queue is gated on `completion_delivered` so foreground
1538        // bash with `notify_on_completion=false` does not leak a user-visible
1539        // completion notification. `mark_terminal` pre-sets
1540        // `completion_delivered=true` for those tasks; honoring it here keeps
1541        // the suppression invariant the test
1542        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
1543        // asserts. The compression-event recording above intentionally runs
1544        // before this gate so foreground bash still contributes to the
1545        // session/project aggregates.
1546        if metadata.completion_delivered {
1547            return;
1548        }
1549
1550        // Push-frame queue dedupe stays per-task to prevent duplicate
1551        // user-visible completion notifications.
1552        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
1553            if completions
1554                .iter()
1555                .any(|existing| existing.task_id == metadata.task_id)
1556            {
1557                false
1558            } else {
1559                completions.push_back(completion.clone());
1560                true
1561            }
1562        } else {
1563            false
1564        };
1565
1566        if pushed && emit_frame {
1567            self.emit_bash_completed(completion);
1568        }
1569    }
1570
1571    fn record_compression_event_if_applicable(
1572        &self,
1573        metadata: &PersistedTask,
1574        token_counts: &CompletionTokenCounts,
1575    ) {
1576        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
1577            token_counts.original_tokens,
1578            token_counts.compressed_tokens,
1579            token_counts.original_bytes,
1580            token_counts.compressed_bytes,
1581        ) {
1582            (
1583                Some(original_tokens),
1584                Some(compressed_tokens),
1585                Some(original_bytes),
1586                Some(compressed_bytes),
1587            ) => (
1588                original_tokens,
1589                compressed_tokens,
1590                original_bytes,
1591                compressed_bytes,
1592            ),
1593            _ => {
1594                crate::slog_warn!(
1595                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
1596                    metadata.task_id
1597                );
1598                return;
1599            }
1600        };
1601
1602        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
1603        let Some(pool) = pool else {
1604            crate::slog_warn!(
1605                "compression event skipped for {}: db_pool not initialized — was configure run?",
1606                metadata.task_id
1607            );
1608            return;
1609        };
1610        let harness = self
1611            .inner
1612            .db_harness
1613            .read()
1614            .ok()
1615            .and_then(|slot| slot.clone());
1616        let Some(harness) = harness else {
1617            crate::slog_warn!(
1618                "compression event insert skipped for {}: harness not configured",
1619                metadata.task_id
1620            );
1621            return;
1622        };
1623
1624        let project_root = metadata
1625            .project_root
1626            .as_deref()
1627            .unwrap_or(&metadata.workdir);
1628        let project_key = crate::search_index::project_cache_key(project_root);
1629        let row = crate::db::compression_events::CompressionEventRow {
1630            harness: &harness,
1631            session_id: Some(&metadata.session_id),
1632            project_key: &project_key,
1633            tool: "bash",
1634            task_id: Some(&metadata.task_id),
1635            command: Some(&metadata.command),
1636            compressor: if metadata.compressed {
1637                "registry"
1638            } else {
1639                "none"
1640            },
1641            original_bytes,
1642            compressed_bytes,
1643            original_tokens,
1644            compressed_tokens,
1645            created_at: unix_millis() as i64,
1646        };
1647
1648        let conn = match pool.lock() {
1649            Ok(conn) => conn,
1650            Err(_) => {
1651                crate::slog_warn!(
1652                    "compression event insert failed for {}: db mutex poisoned",
1653                    metadata.task_id
1654                );
1655                return;
1656            }
1657        };
1658        match crate::db::compression_events::insert_compression_event(&conn, &row) {
1659            Ok(_) => {
1660                // DEBUG-level: each foreground bash call records one of these,
1661                // which clutters info-level logs without adding diagnostic value.
1662                // Aggregate totals are visible via the status RPC / TUI sidebar.
1663                crate::slog_debug!(
1664                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
1665                    metadata.task_id,
1666                    project_key,
1667                    metadata.session_id,
1668                    original_tokens,
1669                    compressed_tokens
1670                );
1671            }
1672            Err(error) => {
1673                crate::slog_warn!(
1674                    "compression event insert failed for {}: {}",
1675                    metadata.task_id,
1676                    error
1677                );
1678            }
1679        }
1680    }
1681
1682    fn emit_bash_completed(&self, completion: BgCompletion) {
1683        let Ok(progress_sender) = self
1684            .inner
1685            .progress_sender
1686            .lock()
1687            .map(|sender| sender.clone())
1688        else {
1689            return;
1690        };
1691        let Some(sender) = progress_sender.as_ref() else {
1692            return;
1693        };
1694        // Clone the callback out of the registry mutex before writing to stdout;
1695        // otherwise a blocked push-frame write could pin the mutex and starve
1696        // unrelated progress-sender updates.
1697        // Bg task transitions are discovered by the watchdog thread, so the
1698        // sender is shared behind a Mutex. It still uses the same stdout writer
1699        // closure as foreground progress frames, preserving the existing lock/
1700        // flush behavior in main.rs.
1701        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1702            completion.task_id,
1703            completion.session_id,
1704            completion.status,
1705            completion.exit_code,
1706            completion.command,
1707            completion.output_preview,
1708            completion.output_truncated,
1709            completion.original_tokens,
1710            completion.compressed_tokens,
1711            completion.tokens_skipped,
1712        )));
1713    }
1714
1715    fn completion_token_counts(
1716        &self,
1717        metadata: &PersistedTask,
1718        buffer: Option<&BgBuffer>,
1719        paths: Option<&TaskPaths>,
1720    ) -> CompletionTokenCounts {
1721        let raw = match buffer {
1722            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
1723            None => paths
1724                .map(|paths| read_for_token_count_from_disk(paths, TOKENIZE_CAP_BYTES_PER_STREAM))
1725                .unwrap_or(TokenCountInput::Skipped),
1726        };
1727
1728        let TokenCountInput::Text(raw_output) = raw else {
1729            return CompletionTokenCounts::skipped();
1730        };
1731
1732        let original_tokens = token_count_u32(&raw_output);
1733        let original_bytes = raw_output.len() as i64;
1734        let compressed_output = if metadata.compressed {
1735            self.compress_output(&metadata.command, raw_output)
1736        } else {
1737            raw_output
1738        };
1739        let compressed_tokens = token_count_u32(&compressed_output);
1740        let compressed_bytes = compressed_output.len() as i64;
1741        CompletionTokenCounts {
1742            original_tokens: Some(original_tokens),
1743            compressed_tokens: Some(compressed_tokens),
1744            original_bytes: Some(original_bytes),
1745            compressed_bytes: Some(compressed_bytes),
1746            tokens_skipped: false,
1747        }
1748    }
1749
1750    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1751        if !self
1752            .inner
1753            .long_running_reminder_enabled
1754            .load(Ordering::SeqCst)
1755        {
1756            return;
1757        }
1758        let interval_ms = self
1759            .inner
1760            .long_running_reminder_interval_ms
1761            .load(Ordering::SeqCst);
1762        if interval_ms == 0 {
1763            return;
1764        }
1765        let interval = Duration::from_millis(interval_ms);
1766        let now = Instant::now();
1767        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1768            return;
1769        };
1770        let since = last_reminder_at.unwrap_or(task.started);
1771        if now.duration_since(since) < interval {
1772            return;
1773        }
1774        let command = task
1775            .state
1776            .lock()
1777            .map(|state| state.metadata.command.clone())
1778            .unwrap_or_default();
1779        *last_reminder_at = Some(now);
1780        self.emit_bash_long_running(BashLongRunningFrame::new(
1781            task.task_id.clone(),
1782            task.session_id.clone(),
1783            command,
1784            task.started.elapsed().as_millis() as u64,
1785        ));
1786    }
1787
1788    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1789        let Ok(progress_sender) = self
1790            .inner
1791            .progress_sender
1792            .lock()
1793            .map(|sender| sender.clone())
1794        else {
1795            return;
1796        };
1797        if let Some(sender) = progress_sender.as_ref() {
1798            sender(PushFrame::BashLongRunning(frame));
1799        }
1800    }
1801
1802    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1803        self.inner
1804            .tasks
1805            .lock()
1806            .ok()
1807            .and_then(|tasks| tasks.get(task_id).cloned())
1808    }
1809
1810    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1811        self.task(task_id)
1812            .filter(|task| task.session_id == session_id)
1813    }
1814
1815    fn running_count(&self) -> usize {
1816        self.inner
1817            .tasks
1818            .lock()
1819            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1820            .unwrap_or(0)
1821    }
1822
1823    fn start_watchdog(&self) {
1824        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1825            super::watchdog::start(self.clone());
1826        }
1827    }
1828
1829    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1830        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1831    }
1832
1833    #[cfg(test)]
1834    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1835        self.task_for_session(task_id, session_id)
1836            .map(|task| task.paths.json.clone())
1837    }
1838
1839    #[cfg(test)]
1840    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1841        self.task_for_session(task_id, session_id)
1842            .map(|task| task.paths.exit.clone())
1843    }
1844
1845    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
1846    fn generate_unique_task_id(&self) -> Result<String, String> {
1847        for _ in 0..32 {
1848            let candidate = random_slug();
1849            let tasks = self
1850                .inner
1851                .tasks
1852                .lock()
1853                .map_err(|_| "background task registry lock poisoned".to_string())?;
1854            if tasks.contains_key(&candidate) {
1855                continue;
1856            }
1857            let completions = self
1858                .inner
1859                .completions
1860                .lock()
1861                .map_err(|_| "background completions lock poisoned".to_string())?;
1862            if completions
1863                .iter()
1864                .any(|completion| completion.task_id == candidate)
1865            {
1866                continue;
1867            }
1868            return Ok(candidate);
1869        }
1870        Err("failed to allocate unique background task id after 32 attempts".to_string())
1871    }
1872}
1873
1874struct CompletionTokenCounts {
1875    original_tokens: Option<u32>,
1876    compressed_tokens: Option<u32>,
1877    original_bytes: Option<i64>,
1878    compressed_bytes: Option<i64>,
1879    tokens_skipped: bool,
1880}
1881
1882impl CompletionTokenCounts {
1883    fn skipped() -> Self {
1884        Self {
1885            original_tokens: None,
1886            compressed_tokens: None,
1887            original_bytes: None,
1888            compressed_bytes: None,
1889            tokens_skipped: true,
1890        }
1891    }
1892}
1893
1894fn token_count_u32(text: &str) -> u32 {
1895    aft_tokenizer::count_tokens(text)
1896        .try_into()
1897        .unwrap_or(u32::MAX)
1898}
1899
1900impl Default for BgTaskRegistry {
1901    fn default() -> Self {
1902        Self::new(Arc::new(Mutex::new(None)))
1903    }
1904}
1905
1906fn modified_within(path: &Path, grace: Duration) -> bool {
1907    fs::metadata(path)
1908        .and_then(|metadata| metadata.modified())
1909        .ok()
1910        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1911        .map(|age| age < grace)
1912        .unwrap_or(false)
1913}
1914
1915fn canonicalized_path(path: &Path) -> PathBuf {
1916    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1917}
1918
1919fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1920    let now_ms = SystemTime::now()
1921        .duration_since(UNIX_EPOCH)
1922        .ok()
1923        .map(|duration| duration.as_millis() as u64)
1924        .unwrap_or(started_at);
1925    let elapsed_ms = now_ms.saturating_sub(started_at);
1926    Instant::now()
1927        .checked_sub(Duration::from_millis(elapsed_ms))
1928        .unwrap_or_else(Instant::now)
1929}
1930
1931fn gc_quarantine(storage_dir: &Path) {
1932    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1933    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1934        return;
1935    };
1936    for session_entry in session_dirs.flatten() {
1937        let session_quarantine_dir = session_entry.path();
1938        if !session_quarantine_dir.is_dir() {
1939            continue;
1940        }
1941        let entries = match fs::read_dir(&session_quarantine_dir) {
1942            Ok(entries) => entries,
1943            Err(error) => {
1944                crate::slog_warn!(
1945                    "failed to read background task quarantine dir {}: {error}",
1946                    session_quarantine_dir.display()
1947                );
1948                continue;
1949            }
1950        };
1951        for entry in entries.flatten() {
1952            let path = entry.path();
1953            if modified_within(&path, QUARANTINE_GC_GRACE) {
1954                continue;
1955            }
1956            let result = if path.is_dir() {
1957                fs::remove_dir_all(&path)
1958            } else {
1959                fs::remove_file(&path)
1960            };
1961            match result {
1962                Ok(()) => log::debug!(
1963                    "deleted old background task quarantine entry {}",
1964                    path.display()
1965                ),
1966                Err(error) => crate::slog_warn!(
1967                    "failed to delete old background task quarantine entry {}: {error}",
1968                    path.display()
1969                ),
1970            }
1971        }
1972        let _ = fs::remove_dir(&session_quarantine_dir);
1973    }
1974    let _ = fs::remove_dir(&quarantine_root);
1975}
1976
1977enum QuarantineKind {
1978    Corrupt,
1979    Invalid,
1980}
1981
1982fn quarantine_task_json(
1983    storage_dir: &Path,
1984    session_dir: &Path,
1985    json_path: &Path,
1986    kind: QuarantineKind,
1987) -> Result<(), String> {
1988    let session_hash = session_dir
1989        .file_name()
1990        .and_then(|name| name.to_str())
1991        .ok_or_else(|| {
1992            format!(
1993                "invalid background task session dir: {}",
1994                session_dir.display()
1995            )
1996        })?;
1997    let task_name = json_path
1998        .file_name()
1999        .and_then(|name| name.to_str())
2000        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2001    let unix_ts = SystemTime::now()
2002        .duration_since(UNIX_EPOCH)
2003        .map(|duration| duration.as_secs())
2004        .unwrap_or(0);
2005    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2006    fs::create_dir_all(&quarantine_dir).map_err(|e| {
2007        format!(
2008            "failed to create background task quarantine dir {}: {e}",
2009            quarantine_dir.display()
2010        )
2011    })?;
2012    let target_name = quarantine_name(task_name, unix_ts, &kind);
2013    let target = quarantine_dir.join(target_name);
2014    fs::rename(json_path, &target).map_err(|e| {
2015        format!(
2016            "failed to quarantine background task metadata {} to {}: {e}",
2017            json_path.display(),
2018            target.display()
2019        )
2020    })?;
2021
2022    for sibling in task_sibling_paths(json_path) {
2023        if !sibling.exists() {
2024            continue;
2025        }
2026        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2027            crate::slog_warn!(
2028                "skipping background task sibling with invalid name during quarantine: {}",
2029                sibling.display()
2030            );
2031            continue;
2032        };
2033        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2034        if let Err(error) = fs::rename(&sibling, &sibling_target) {
2035            crate::slog_warn!(
2036                "failed to quarantine background task sibling {} to {}: {error}",
2037                sibling.display(),
2038                sibling_target.display()
2039            );
2040        }
2041    }
2042
2043    let _ = fs::remove_dir(session_dir);
2044    Ok(())
2045}
2046
2047fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2048    match kind {
2049        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2050        QuarantineKind::Invalid => {
2051            let path = Path::new(file_name);
2052            let stem = path.file_stem().and_then(|stem| stem.to_str());
2053            let extension = path.extension().and_then(|extension| extension.to_str());
2054            match (stem, extension) {
2055                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2056                _ => format!("{file_name}.invalid.{unix_ts}"),
2057            }
2058        }
2059    }
2060}
2061
2062fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2063    let Some(parent) = json_path.parent() else {
2064        return Vec::new();
2065    };
2066    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2067        return Vec::new();
2068    };
2069    ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
2070        .into_iter()
2071        .map(|extension| parent.join(format!("{stem}.{extension}")))
2072        .collect()
2073}
2074
2075fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
2076    let stdout = fs::read(&paths.stdout).unwrap_or_default();
2077    let stderr = fs::read(&paths.stderr).unwrap_or_default();
2078    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2079    bytes.extend_from_slice(&stdout);
2080    bytes.extend_from_slice(&stderr);
2081    if bytes.len() <= max_bytes {
2082        return (String::from_utf8_lossy(&bytes).into_owned(), false);
2083    }
2084    let start = bytes.len().saturating_sub(max_bytes);
2085    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2086}
2087
2088fn read_for_token_count_from_disk(
2089    paths: &TaskPaths,
2090    max_bytes_per_stream: usize,
2091) -> TokenCountInput {
2092    // Read up to `max_bytes_per_stream` bytes per stream rather than
2093    // refusing to tokenize anything when the file exceeds the cap.
2094    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
2095    // (see comment there) — large outputs are exactly the tasks that
2096    // benefit most from compression accounting, so silent-skipping
2097    // them defeats the purpose of token tracking.
2098    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2099    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2100    match (stdout, stderr) {
2101        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2102            String::from_utf8_lossy(&stdout).as_ref(),
2103            String::from_utf8_lossy(&stderr).as_ref(),
2104        )),
2105        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2106            String::from_utf8_lossy(&stdout).as_ref(),
2107            "",
2108        )),
2109        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2110            "",
2111            String::from_utf8_lossy(&stderr).as_ref(),
2112        )),
2113        (Err(_), Err(_)) => TokenCountInput::Skipped,
2114    }
2115}
2116
2117/// Read at most `max_bytes` bytes from the END of `path`. Used for
2118/// tokenization where the most recent output is more representative than
2119/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
2120/// opened (genuinely missing or permissions error).
2121fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2122    use std::io::{Read, Seek, SeekFrom};
2123    let mut file = std::fs::File::open(path)?;
2124    let len = file.metadata()?.len();
2125    let read_len = len.min(max_bytes as u64);
2126    if read_len > 0 && len > max_bytes as u64 {
2127        file.seek(SeekFrom::End(-(read_len as i64)))?;
2128    }
2129    let mut bytes = Vec::with_capacity(read_len as usize);
2130    file.read_to_end(&mut bytes)?;
2131    Ok(bytes)
2132}
2133
2134impl BgTask {
2135    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2136        let state = self
2137            .state
2138            .lock()
2139            .unwrap_or_else(|poison| poison.into_inner());
2140        self.snapshot_locked(&state, preview_bytes)
2141    }
2142
2143    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2144        let metadata = &state.metadata;
2145        let duration_ms = metadata.duration_ms.or_else(|| {
2146            metadata
2147                .status
2148                .is_terminal()
2149                .then(|| self.started.elapsed().as_millis() as u64)
2150        });
2151        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
2152        BgTaskSnapshot {
2153            info: BgTaskInfo {
2154                task_id: self.task_id.clone(),
2155                status: metadata.status.clone(),
2156                command: metadata.command.clone(),
2157                started_at: metadata.started_at,
2158                duration_ms,
2159            },
2160            exit_code: metadata.exit_code,
2161            child_pid: metadata.child_pid,
2162            workdir: metadata.workdir.display().to_string(),
2163            output_preview,
2164            output_truncated,
2165            output_path: state
2166                .buffer
2167                .output_path()
2168                .map(|path| path.display().to_string()),
2169            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
2170        }
2171    }
2172
2173    pub(crate) fn is_running(&self) -> bool {
2174        self.state
2175            .lock()
2176            .map(|state| state.metadata.status == BgTaskStatus::Running)
2177            .unwrap_or(false)
2178    }
2179
2180    fn mark_terminal_now(&self) {
2181        if let Ok(mut terminal_at) = self.terminal_at.lock() {
2182            if terminal_at.is_none() {
2183                *terminal_at = Some(Instant::now());
2184            }
2185        }
2186    }
2187
2188    fn set_completion_delivered(
2189        &self,
2190        delivered: bool,
2191        registry: &BgTaskRegistry,
2192    ) -> Result<(), String> {
2193        let mut state = self
2194            .state
2195            .lock()
2196            .map_err(|_| "background task lock poisoned".to_string())?;
2197        let updated = registry
2198            .update_task_metadata(&self.paths, |metadata| {
2199                metadata.completion_delivered = delivered;
2200            })
2201            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2202        state.metadata = updated;
2203        Ok(())
2204    }
2205}
2206
2207fn terminal_metadata_from_marker(
2208    mut metadata: PersistedTask,
2209    marker: ExitMarker,
2210    reason: Option<String>,
2211) -> PersistedTask {
2212    match marker {
2213        ExitMarker::Code(code) => {
2214            let status = if code == 0 {
2215                BgTaskStatus::Completed
2216            } else {
2217                BgTaskStatus::Failed
2218            };
2219            metadata.mark_terminal(status, Some(code), reason);
2220        }
2221        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2222    }
2223    metadata
2224}
2225
2226#[cfg(unix)]
2227fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2228    let shell = resolve_posix_shell();
2229    let mut cmd = Command::new(&shell);
2230    cmd.arg("-c")
2231        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2232        .arg(&shell)
2233        .arg(command)
2234        .arg(exit_path);
2235    unsafe {
2236        cmd.pre_exec(|| {
2237            if libc::setsid() == -1 {
2238                return Err(std::io::Error::last_os_error());
2239            }
2240            Ok(())
2241        });
2242    }
2243    cmd
2244}
2245
2246#[cfg(unix)]
2247fn resolve_posix_shell() -> PathBuf {
2248    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2249    POSIX_SHELL
2250        .get_or_init(|| {
2251            std::env::var_os("BASH")
2252                .filter(|value| !value.is_empty())
2253                .map(PathBuf::from)
2254                .filter(|path| path.exists())
2255                .or_else(|| which::which("bash").ok())
2256                .or_else(|| which::which("zsh").ok())
2257                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2258        })
2259        .clone()
2260}
2261
2262#[cfg(windows)]
2263fn detached_shell_command_for(
2264    shell: crate::windows_shell::WindowsShell,
2265    command: &str,
2266    exit_path: &Path,
2267    paths: &TaskPaths,
2268    creation_flags: u32,
2269) -> Result<Command, String> {
2270    use crate::windows_shell::WindowsShell;
2271    // Write the wrapper to a temp file alongside the other task files,
2272    // then invoke the shell with the file path as a single clean
2273    // argument. This sidesteps the entire Windows command-line quoting
2274    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
2275    // parser all interacting with embedded quotes in the wrapper).
2276    //
2277    // Path arguments don't need quoting in the same problematic way
2278    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
2279    // contains no characters that need shell escaping; (2) the wrapper
2280    // body's internal quotes never reach the shell command line — the
2281    // shell reads them from disk by file syntax rules, not command-line
2282    // parser rules.
2283    let wrapper_body = shell.wrapper_script(command, exit_path);
2284    let wrapper_ext = match shell {
2285        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2286        WindowsShell::Cmd => "bat",
2287        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
2288        // so the file extension is purely cosmetic; `.sh` matches what an
2289        // operator would expect when grepping the spill directory.
2290        WindowsShell::Posix(_) => "sh",
2291    };
2292    let wrapper_path = paths.dir.join(format!(
2293        "{}.{}",
2294        paths
2295            .json
2296            .file_stem()
2297            .and_then(|s| s.to_str())
2298            .unwrap_or("wrapper"),
2299        wrapper_ext
2300    ));
2301    fs::write(&wrapper_path, wrapper_body)
2302        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2303
2304    let mut cmd = Command::new(shell.binary().as_ref());
2305    match shell {
2306        WindowsShell::Pwsh | WindowsShell::Powershell => {
2307            // -File runs the script with no quoting issues. `-NoLogo`,
2308            // `-NoProfile`, etc. apply to the host before the file runs.
2309            cmd.args([
2310                "-NoLogo",
2311                "-NoProfile",
2312                "-NonInteractive",
2313                "-ExecutionPolicy",
2314                "Bypass",
2315                "-File",
2316            ]);
2317            cmd.arg(&wrapper_path);
2318        }
2319        WindowsShell::Cmd => {
2320            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
2321            // file via /C is well-defined; the file's contents are
2322            // read line-by-line by cmd's batch processor, NOT
2323            // re-interpreted by the /C parser. This avoids the
2324            // "filename syntax incorrect" errors that came from
2325            // having complex compound commands on the cmd line.
2326            cmd.args(["/D", "/C"]);
2327            cmd.arg(&wrapper_path);
2328        }
2329        WindowsShell::Posix(_) => {
2330            // git-bash and other POSIX shells run the wrapper script with
2331            // `<binary> <wrapper-path>` (the wrapper is just a shell
2332            // script). No special flags needed — the `trap` and atomic
2333            // exit-marker rename in `wrapper_script` are POSIX-standard.
2334            cmd.arg(&wrapper_path);
2335        }
2336    }
2337
2338    // Win32 process creation flags. Caller selects whether to include
2339    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
2340    // for the breakaway-fallback strategy.
2341    cmd.creation_flags(creation_flags);
2342    Ok(cmd)
2343}
2344
2345/// Spawn a detached background bash child process.
2346///
2347/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
2348/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
2349/// cmd.exe) and retries with the next candidate when the previous one
2350/// fails to spawn with `NotFound` — the same runtime safety net the
2351/// foreground bash path has, so issue #27 callers landing on cmd.exe
2352/// fallback can also use background bash. The wrapper script is
2353/// regenerated per attempt because PowerShell wrappers embed the shell
2354/// binary by name; the stdout/stderr capture handles are also reopened
2355/// per attempt because `Command::spawn()` consumes them.
2356///
2357/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
2358/// return immediately without retry — they indicate a problem with the
2359/// resolved shell that retrying with a different shell won't fix.
2360fn spawn_detached_child(
2361    command: &str,
2362    paths: &TaskPaths,
2363    workdir: &Path,
2364    env: &HashMap<String, String>,
2365) -> Result<std::process::Child, String> {
2366    #[cfg(not(windows))]
2367    {
2368        let stdout = create_capture_file(&paths.stdout)
2369            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2370        let stderr = create_capture_file(&paths.stderr)
2371            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2372        detached_shell_command(command, &paths.exit)
2373            .current_dir(workdir)
2374            .envs(env)
2375            .stdin(Stdio::null())
2376            .stdout(Stdio::from(stdout))
2377            .stderr(Stdio::from(stderr))
2378            .spawn()
2379            .map_err(|e| format!("failed to spawn background bash command: {e}"))
2380    }
2381    #[cfg(windows)]
2382    {
2383        use crate::windows_shell::shell_candidates;
2384        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
2385        // legacy foreground bash spawn path. v0.20 routes ALL bash through
2386        // this background spawn helper, including foreground tool calls
2387        // where the model writes PowerShell-syntax (`$var = ...`,
2388        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
2389        // The earlier v0.18-era cmd-first override worked around a
2390        // PowerShell detached-output bug; that bug is fixed at the
2391        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
2392        // see flag block below), so we no longer need to misroute PS
2393        // commands through cmd.
2394        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
2395        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
2396        // first (so the bg child outlives the AFT process when AFT is killed),
2397        // then fall back without it for environments where the parent is in a
2398        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
2399        // runners (GitHub Actions windows-2022) and some MDM-managed corp
2400        // environments hit this — `CreateProcess` returns Access Denied (5).
2401        // Without breakaway, the child still runs detached but will be torn
2402        // down with the parent if the parent process group is signaled.
2403        //
2404        // We use CREATE_NO_WINDOW (no visible console window, but the
2405        // child still has a hidden console) rather than DETACHED_PROCESS
2406        // (no console at all). PowerShell-based wrappers that perform
2407        // file I/O via [System.IO.File] need a console handle to flush
2408        // stdout/stderr correctly even when redirected — under
2409        // DETACHED_PROCESS, pwsh sometimes silently exits before
2410        // executing later script statements (the Move-Item that writes
2411        // the exit marker never runs), leaving the bg task forever
2412        // marked Failed: process exited without exit marker. cmd.exe
2413        // wrappers tolerate DETACHED_PROCESS, but switching to
2414        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
2415        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
2416        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
2417        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
2418        let with_breakaway =
2419            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
2420        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
2421        let mut last_error: Option<String> = None;
2422        for (idx, shell) in candidates.iter().enumerate() {
2423            // Per-shell, try with breakaway first. If the process is in a
2424            // restrictive job, the breakaway flag triggers Access Denied
2425            // (os error 5). Retry once without breakaway.
2426            for &flags in &[with_breakaway, without_breakaway] {
2427                // Re-open capture handles per attempt; spawn() consumes them.
2428                let stdout = create_capture_file(&paths.stdout)
2429                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2430                let stderr = create_capture_file(&paths.stderr)
2431                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2432                let mut cmd =
2433                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
2434                cmd.current_dir(workdir)
2435                    .envs(env)
2436                    .stdin(Stdio::null())
2437                    .stdout(Stdio::from(stdout))
2438                    .stderr(Stdio::from(stderr));
2439                match cmd.spawn() {
2440                    Ok(child) => {
2441                        if idx > 0 {
2442                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
2443                             the cached PATH probe disagreed with runtime spawn — likely PATH \
2444                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
2445                            shell.binary(),
2446                            idx);
2447                        }
2448                        if flags == without_breakaway {
2449                            crate::slog_warn!(
2450                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
2451                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
2452                             Spawned without breakaway; the bg task will be torn down if the \
2453                             AFT process group is killed."
2454                            );
2455                        }
2456                        return Ok(child);
2457                    }
2458                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2459                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
2460                        shell.binary());
2461                        last_error = Some(format!("{}: {e}", shell.binary()));
2462                        // Skip the without-breakaway retry for NotFound — the
2463                        // binary itself is missing, breakaway flag is irrelevant.
2464                        break;
2465                    }
2466                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
2467                        // Access Denied during breakaway — retry without it.
2468                        crate::slog_warn!(
2469                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
2470                         Access Denied — retrying {} without breakaway",
2471                            shell.binary()
2472                        );
2473                        last_error = Some(format!("{}: {e}", shell.binary()));
2474                        continue;
2475                    }
2476                    Err(e) => {
2477                        return Err(format!(
2478                            "failed to spawn background bash command via {}: {e}",
2479                            shell.binary()
2480                        ));
2481                    }
2482                }
2483            }
2484        }
2485        Err(format!(
2486            "failed to spawn background bash command: no Windows shell could be spawned. \
2487             Last error: {}. PATH-probed candidates: {:?}",
2488            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
2489            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
2490        ))
2491    }
2492}
2493
2494fn random_slug() -> String {
2495    let mut bytes = [0u8; 4];
2496    // getrandom is a transitive dependency; use it directly for OS entropy.
2497    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
2498        // Extremely unlikely fallback: time + pid mix.
2499        let t = SystemTime::now()
2500            .duration_since(UNIX_EPOCH)
2501            .map(|d| d.subsec_nanos())
2502            .unwrap_or(0);
2503        let p = std::process::id();
2504        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
2505    });
2506    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
2507    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
2508    format!("bash-{hex}")
2509}
2510
2511#[cfg(test)]
2512mod tests {
2513    use std::collections::HashMap;
2514    #[cfg(windows)]
2515    use std::fs;
2516    use std::sync::{Arc, Mutex};
2517    use std::time::Duration;
2518    #[cfg(windows)]
2519    use std::time::Instant;
2520
2521    use super::*;
2522
2523    #[cfg(unix)]
2524    const QUICK_SUCCESS_COMMAND: &str = "true";
2525    #[cfg(windows)]
2526    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
2527
2528    #[cfg(unix)]
2529    const LONG_RUNNING_COMMAND: &str = "sleep 5";
2530    #[cfg(windows)]
2531    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
2532
2533    /// Spawn a child process that exits immediately and return it after
2534    /// it has terminated. Used by reap_child tests to simulate the
2535    /// "child exists and is dead" state when the watchdog has already
2536    /// nulled out the original child handle.
2537    fn spawn_dead_child() -> std::process::Child {
2538        #[cfg(unix)]
2539        let mut cmd = std::process::Command::new("true");
2540        #[cfg(windows)]
2541        let mut cmd = {
2542            let mut c = std::process::Command::new("cmd");
2543            c.args(["/c", "exit", "0"]);
2544            c
2545        };
2546        cmd.stdin(std::process::Stdio::null());
2547        cmd.stdout(std::process::Stdio::null());
2548        cmd.stderr(std::process::Stdio::null());
2549        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
2550        // Poll try_wait() until the child actually exits, instead of calling
2551        // wait() which closes the OS handle. On Windows, after wait()
2552        // closes the handle, subsequent try_wait() calls (which reap_child
2553        // depends on) return Err — the test was inadvertently giving
2554        // reap_child an unusable child handle. Polling try_wait() keeps the
2555        // handle open and observes natural exit, matching the production
2556        // shape where the watchdog discovers an exited child for the first
2557        // time.
2558        let started = Instant::now();
2559        loop {
2560            match child.try_wait() {
2561                Ok(Some(_)) => break,
2562                Ok(None) => {
2563                    if started.elapsed() > Duration::from_secs(5) {
2564                        panic!("dead-child stand-in did not exit within 5s");
2565                    }
2566                    std::thread::sleep(Duration::from_millis(10));
2567                }
2568                Err(error) => panic!("dead-child try_wait failed: {error}"),
2569            }
2570        }
2571        child
2572    }
2573
2574    #[test]
2575    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
2576        let registry = BgTaskRegistry::default();
2577        let dir = tempfile::tempdir().unwrap();
2578        let task_id = registry
2579            .spawn(
2580                QUICK_SUCCESS_COMMAND,
2581                "session".to_string(),
2582                dir.path().to_path_buf(),
2583                HashMap::new(),
2584                Some(Duration::from_secs(30)),
2585                dir.path().to_path_buf(),
2586                10,
2587                true,
2588                false,
2589                Some(dir.path().to_path_buf()),
2590            )
2591            .unwrap();
2592        registry
2593            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2594            .unwrap();
2595        let completions = registry.drain_completions_for_session(Some("session"));
2596        assert_eq!(completions.len(), 1);
2597        assert_eq!(
2598            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
2599            vec![task_id.clone()]
2600        );
2601
2602        registry.cleanup_finished(Duration::ZERO);
2603
2604        assert!(registry.inner.tasks.lock().unwrap().is_empty());
2605    }
2606
2607    #[test]
2608    fn cleanup_finished_retains_undelivered_terminals() {
2609        let registry = BgTaskRegistry::default();
2610        let dir = tempfile::tempdir().unwrap();
2611        let task_id = registry
2612            .spawn(
2613                QUICK_SUCCESS_COMMAND,
2614                "session".to_string(),
2615                dir.path().to_path_buf(),
2616                HashMap::new(),
2617                Some(Duration::from_secs(30)),
2618                dir.path().to_path_buf(),
2619                10,
2620                true,
2621                false,
2622                Some(dir.path().to_path_buf()),
2623            )
2624            .unwrap();
2625        registry
2626            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2627            .unwrap();
2628
2629        registry.cleanup_finished(Duration::ZERO);
2630
2631        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2632    }
2633
2634    /// Verify that the live watchdog path (reap_child) gives an exited
2635    /// child one watchdog pass for its exit marker to land, then marks the
2636    /// task Failed if the next pass still sees no marker.
2637    ///
2638    /// Cross-platform: uses a quick-exiting command that does NOT go
2639    /// through the wrapper script (we manually clear the exit marker
2640    /// after spawn to simulate the wrapper crashing before write).
2641    #[test]
2642    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
2643        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2644        let dir = tempfile::tempdir().unwrap();
2645        let task_id = registry
2646            .spawn(
2647                QUICK_SUCCESS_COMMAND,
2648                "session".to_string(),
2649                dir.path().to_path_buf(),
2650                HashMap::new(),
2651                Some(Duration::from_secs(30)),
2652                dir.path().to_path_buf(),
2653                10,
2654                true,
2655                false,
2656                Some(dir.path().to_path_buf()),
2657            )
2658            .unwrap();
2659
2660        let task = registry.task_for_session(&task_id, "session").unwrap();
2661
2662        // Wait for the child to actually exit and the wrapper to either
2663        // write the marker or fail. Then nuke the marker to simulate
2664        // wrapper crash before write. Poll up to 5s; this is plenty for a
2665        // `true`/`cmd /c exit 0` invocation.
2666        let started = Instant::now();
2667        loop {
2668            let exited = {
2669                let mut state = task.state.lock().unwrap();
2670                if let Some(child) = state.child.as_mut() {
2671                    matches!(child.try_wait(), Ok(Some(_)))
2672                } else {
2673                    true
2674                }
2675            };
2676            if exited {
2677                break;
2678            }
2679            assert!(
2680                started.elapsed() < Duration::from_secs(5),
2681                "child should exit quickly"
2682            );
2683            std::thread::sleep(Duration::from_millis(20));
2684        }
2685
2686        // Stop the watchdog so it doesn't race with our manual reap_child.
2687        // On fast Windows runners the watchdog ticks (every 500ms) can
2688        // observe the child exit and reap it before this test's assertion
2689        // fires, leaving us with state.child = None and an already-terminal
2690        // status. We specifically want to test reap_child's logic when
2691        // invoked manually on a Running-but-actually-dead task, so we need
2692        // exclusive control over the reap path here.
2693        registry
2694            .inner
2695            .shutdown
2696            .store(true, std::sync::atomic::Ordering::SeqCst);
2697        // Give the watchdog at most one tick (500ms) to notice shutdown
2698        // before we touch task state. Without this, an in-flight watchdog
2699        // iteration could still race with our state setup below.
2700        std::thread::sleep(Duration::from_millis(550));
2701
2702        // Wrapper likely wrote the marker by now; remove it to simulate
2703        // a wrapper crash that exited before persisting the exit code.
2704        let _ = std::fs::remove_file(&task.paths.exit);
2705
2706        // The watchdog may have already reaped the child handle and
2707        // marked the task terminal before we got here. Reset both so
2708        // reap_child has the "Running task whose child just exited"
2709        // shape it's designed to handle. If the original child handle is
2710        // gone, install a quick-exited stand-in so the first reap exercises
2711        // the same try_wait path as production.
2712        //
2713        // CRITICAL on Windows: the watchdog ticks fast enough that the
2714        // JSON on disk may already say `Completed`. `update_task` (called
2715        // by `reap_child`) reads from disk, applies the closure, but
2716        // ROLLS BACK if the original on-disk state was already terminal
2717        // (see persistence.rs::update_task). So we must reset BOTH
2718        // in-memory metadata AND the JSON on disk to a Running state to
2719        // give reap_child the fresh shape it expects to operate on.
2720        {
2721            let mut state = task.state.lock().unwrap();
2722            state.metadata.status = BgTaskStatus::Running;
2723            state.metadata.status_reason = None;
2724            state.metadata.exit_code = None;
2725            state.metadata.finished_at = None;
2726            state.metadata.duration_ms = None;
2727            // Persist the reset state to disk so update_task's terminal
2728            // rollback guard sees a non-terminal starting point.
2729            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2730                .expect("persist reset Running metadata for reap_child test");
2731            // If the watchdog already nulled state.child, we need to
2732            // simulate "child exists and is dead" so reap_child's
2733            // try_wait path runs. Spawn a quick-exit child as a stand-in.
2734            if state.child.is_none() {
2735                state.child = Some(spawn_dead_child());
2736            }
2737        }
2738        // Clear the terminal_at marker too so mark_terminal_now() can fire
2739        // again inside reap_child.
2740        *task.terminal_at.lock().unwrap() = None;
2741
2742        // Sanity: task is still Running per metadata (replay/poll hasn't
2743        // observed the missing marker yet).
2744        assert!(
2745            task.is_running(),
2746            "precondition: metadata.status == Running"
2747        );
2748        assert!(
2749            !task.paths.exit.exists(),
2750            "precondition: exit marker absent"
2751        );
2752
2753        // First watchdog observation is intentionally insufficient to
2754        // declare failure. A missing marker may just mean the wrapper is
2755        // still completing its tmp-file-to-marker rename, so reap_child only
2756        // drops the child handle and switches to detached PID monitoring.
2757        registry.reap_child(&task);
2758
2759        {
2760            let state = task.state.lock().unwrap();
2761            assert_eq!(
2762                state.metadata.status,
2763                BgTaskStatus::Running,
2764                "first reap must leave status Running while waiting one pass for marker"
2765            );
2766            assert_eq!(
2767                state.metadata.status_reason, None,
2768                "first reap must not record a failure reason"
2769            );
2770            assert!(
2771                state.child.is_none(),
2772                "child handle must be released after first reap"
2773            );
2774            assert!(
2775                state.detached,
2776                "task must be marked detached after first reap"
2777            );
2778        }
2779
2780        // Second watchdog observation sees the detached PID is dead and the
2781        // marker is still absent. That is strong enough evidence that the
2782        // wrapper exited without persisting an exit code.
2783        registry.reap_child(&task);
2784
2785        let state = task.state.lock().unwrap();
2786        assert!(
2787            state.metadata.status.is_terminal(),
2788            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
2789            state.metadata.status
2790        );
2791        assert_eq!(
2792            state.metadata.status,
2793            BgTaskStatus::Failed,
2794            "must specifically be Failed (not Killed): status={:?}",
2795            state.metadata.status
2796        );
2797        assert_eq!(
2798            state.metadata.status_reason.as_deref(),
2799            Some("process exited without exit marker"),
2800            "reason must match replay path's wording: {:?}",
2801            state.metadata.status_reason
2802        );
2803        assert!(
2804            state.child.is_none(),
2805            "child handle must stay released after second reap"
2806        );
2807        assert!(
2808            state.detached,
2809            "task must remain detached after second reap"
2810        );
2811    }
2812
2813    /// Companion to the above: when the exit marker DOES exist on disk
2814    /// at reap_child time, reap_child must NOT mark the task Failed.
2815    /// Instead it leaves status=Running and lets the next poll_task()
2816    /// cycle finalize via the marker.
2817    #[test]
2818    fn reap_child_preserves_running_when_exit_marker_exists() {
2819        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2820        let dir = tempfile::tempdir().unwrap();
2821        let task_id = registry
2822            .spawn(
2823                QUICK_SUCCESS_COMMAND,
2824                "session".to_string(),
2825                dir.path().to_path_buf(),
2826                HashMap::new(),
2827                Some(Duration::from_secs(30)),
2828                dir.path().to_path_buf(),
2829                10,
2830                true,
2831                false,
2832                Some(dir.path().to_path_buf()),
2833            )
2834            .unwrap();
2835
2836        let task = registry.task_for_session(&task_id, "session").unwrap();
2837
2838        // Wait for child to exit AND for the marker to land. Both happen
2839        // shortly after the wrapper finishes — but we want both observed.
2840        let started = Instant::now();
2841        loop {
2842            let exited = {
2843                let mut state = task.state.lock().unwrap();
2844                if let Some(child) = state.child.as_mut() {
2845                    matches!(child.try_wait(), Ok(Some(_)))
2846                } else {
2847                    true
2848                }
2849            };
2850            if exited && task.paths.exit.exists() {
2851                break;
2852            }
2853            assert!(
2854                started.elapsed() < Duration::from_secs(5),
2855                "child should exit and write marker quickly"
2856            );
2857            std::thread::sleep(Duration::from_millis(20));
2858        }
2859
2860        // Stop the watchdog so it doesn't race with our manual reap_child.
2861        // On fast Windows runners the watchdog can call poll_task (which
2862        // finalizes via marker) before this test asserts the
2863        // "marker exists, status still Running" invariant. We want
2864        // exclusive control over the reap path.
2865        registry
2866            .inner
2867            .shutdown
2868            .store(true, std::sync::atomic::Ordering::SeqCst);
2869        std::thread::sleep(Duration::from_millis(550));
2870
2871        // If the watchdog already finalized the task before we stopped it,
2872        // restore the test setup: reset status to Running and ensure the
2873        // marker file is still on disk. We're testing reap_child's
2874        // behavior when called manually with both child-exited AND
2875        // marker-present, regardless of whether the watchdog beat us.
2876        {
2877            let mut state = task.state.lock().unwrap();
2878            state.metadata.status = BgTaskStatus::Running;
2879            state.metadata.status_reason = None;
2880            if state.child.is_none() {
2881                state.child = Some(spawn_dead_child());
2882            }
2883        }
2884        *task.terminal_at.lock().unwrap() = None;
2885        // Make sure the marker is still on disk (poll_task removes it on
2886        // finalization). Recreate it if needed.
2887        if !task.paths.exit.exists() {
2888            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2889        }
2890
2891        // reap_child sees: child exited, marker exists. It should:
2892        //  - drop state.child / set state.detached = true
2893        //  - NOT change status (poll_task will finalize via marker next tick)
2894        registry.reap_child(&task);
2895
2896        let state = task.state.lock().unwrap();
2897        assert!(
2898            state.child.is_none(),
2899            "child handle still released even when marker exists"
2900        );
2901        assert!(
2902            state.detached,
2903            "task still marked detached even when marker exists"
2904        );
2905        // Status remains Running because reap_child defers to poll_task
2906        // when a marker exists. It would be wrong for reap to record the
2907        // marker outcome (poll_task does that with proper exit-code
2908        // parsing).
2909        assert_eq!(
2910            state.metadata.status,
2911            BgTaskStatus::Running,
2912            "reap_child must defer to poll_task when marker exists"
2913        );
2914    }
2915
2916    #[test]
2917    fn cleanup_finished_keeps_running_tasks() {
2918        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2919        let dir = tempfile::tempdir().unwrap();
2920        let task_id = registry
2921            .spawn(
2922                LONG_RUNNING_COMMAND,
2923                "session".to_string(),
2924                dir.path().to_path_buf(),
2925                HashMap::new(),
2926                Some(Duration::from_secs(30)),
2927                dir.path().to_path_buf(),
2928                10,
2929                true,
2930                false,
2931                Some(dir.path().to_path_buf()),
2932            )
2933            .unwrap();
2934
2935        registry.cleanup_finished(Duration::ZERO);
2936
2937        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2938        let _ = registry.kill(&task_id, "session");
2939    }
2940
2941    #[cfg(windows)]
2942    fn wait_for_file(path: &Path) -> String {
2943        let started = Instant::now();
2944        loop {
2945            if path.exists() {
2946                return fs::read_to_string(path).expect("read file");
2947            }
2948            assert!(
2949                started.elapsed() < Duration::from_secs(30),
2950                "timed out waiting for {}",
2951                path.display()
2952            );
2953            std::thread::sleep(Duration::from_millis(100));
2954        }
2955    }
2956
2957    #[cfg(windows)]
2958    fn spawn_windows_registry_command(
2959        command: &str,
2960    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2961        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2962        let dir = tempfile::tempdir().unwrap();
2963        let task_id = registry
2964            .spawn(
2965                command,
2966                "session".to_string(),
2967                dir.path().to_path_buf(),
2968                HashMap::new(),
2969                Some(Duration::from_secs(30)),
2970                dir.path().to_path_buf(),
2971                10,
2972                false,
2973                false,
2974                Some(dir.path().to_path_buf()),
2975            )
2976            .unwrap();
2977        (registry, dir, task_id)
2978    }
2979
2980    #[cfg(windows)]
2981    #[test]
2982    fn windows_spawn_writes_exit_marker_for_zero_exit() {
2983        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2984        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2985
2986        let content = wait_for_file(&exit_path);
2987
2988        assert_eq!(content.trim(), "0");
2989    }
2990
2991    #[cfg(windows)]
2992    #[test]
2993    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2994        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2995        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2996
2997        let content = wait_for_file(&exit_path);
2998
2999        assert_eq!(content.trim(), "42");
3000    }
3001
3002    #[cfg(windows)]
3003    #[test]
3004    fn windows_spawn_captures_stdout_to_disk() {
3005        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3006        let task = registry.task_for_session(&task_id, "session").unwrap();
3007        let stdout_path = task.paths.stdout.clone();
3008        let exit_path = task.paths.exit.clone();
3009
3010        let _ = wait_for_file(&exit_path);
3011        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3012
3013        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3014    }
3015
3016    #[cfg(windows)]
3017    #[test]
3018    fn windows_spawn_uses_pwsh_when_available() {
3019        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
3020        // (We intentionally pass None for shell_env to keep this test
3021        // independent of the runner's actual env.)
3022        let candidates = crate::windows_shell::shell_candidates_with(
3023            |binary| match binary {
3024                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3025                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3026                _ => None,
3027            },
3028            || None,
3029        );
3030        let shell = candidates.first().expect("at least one candidate").clone();
3031        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3032        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3033    }
3034
3035    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
3036    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
3037    /// evaluation is the default for batch files; parse-time expansion only
3038    /// applies to compound `&`-chained inline commands). Capturing
3039    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
3040    /// command runs records the real run-time exit code.
3041    #[cfg(windows)]
3042    #[test]
3043    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3044        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3045        let script =
3046            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3047
3048        // Batch wrapper: capture exit code into CODE on the line after the
3049        // user command, then write CODE to a temp marker file before
3050        // atomic-renaming it into place.
3051        assert!(
3052            script.contains("set CODE=%ERRORLEVEL%"),
3053            "wrapper must capture exit code into CODE: {script}"
3054        );
3055        assert!(
3056            script.contains("echo %CODE% >"),
3057            "wrapper must echo CODE to a temp marker file: {script}"
3058        );
3059        assert!(
3060            script.contains("move /Y"),
3061            "wrapper must use atomic move to write the marker: {script}"
3062        );
3063        // move output must be redirected to nul to avoid polluting the
3064        // user's captured stdout with "1 file(s) moved." lines.
3065        assert!(
3066            script.contains("> nul"),
3067            "wrapper must redirect move output to nul: {script}"
3068        );
3069        // exit /B %CODE% propagates the real exit code so wait() sees it.
3070        assert!(
3071            script.contains("exit /B %CODE%"),
3072            "wrapper must propagate the captured exit code: {script}"
3073        );
3074        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3075        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3076    }
3077
3078    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
3079    /// written to a `.bat` file where batch-line evaluation captures
3080    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
3081    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
3082    /// internal `"`-quoting from `cmd_quote`).
3083    #[cfg(windows)]
3084    #[test]
3085    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3086        use crate::windows_shell::WindowsShell;
3087        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3088        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3089        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3090        assert_eq!(
3091            args_strs,
3092            vec!["/D", "/S", "/C", "echo wrapped"],
3093            "Cmd::bg_command must prepend /D /S /C"
3094        );
3095    }
3096
3097    /// PowerShell variants don't need `/V:ON`-style flags; their args
3098    /// are the same for foreground (`command()`) and background
3099    /// (`bg_command()`).
3100    #[cfg(windows)]
3101    #[test]
3102    fn windows_shell_pwsh_bg_command_uses_standard_args() {
3103        use crate::windows_shell::WindowsShell;
3104        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3105        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3106        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3107        assert!(
3108            args_strs.contains(&"-Command"),
3109            "Pwsh::bg_command must use -Command: {args_strs:?}"
3110        );
3111        assert!(
3112            args_strs.contains(&"Get-Date"),
3113            "Pwsh::bg_command must include the user command body"
3114        );
3115    }
3116
3117    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
3118    /// **cmd.exe-specific** wrapper path captures the user command's
3119    /// run-time exit code correctly. The existing
3120    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
3121    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
3122    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
3123    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
3124    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
3125    ///
3126    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
3127    /// force the cmd-wrapper code path, then writes the exit marker and
3128    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
3129    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
3130    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
3131    /// regardless of what the user command returned.
3132    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
3133    /// the inline command-line wrapper helper that production code does
3134    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
3135    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
3136    /// produced silent failures on Windows 11 (move /Y could not parse
3137    /// path arguments through cmd's /C parser). The `bg_command` helper
3138    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
3139    /// the production spawn path goes through `detached_shell_command_for`
3140    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
3141    /// <bat-path>`.
3142    ///
3143    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
3144    /// covered live by the Windows e2e harness scenario 2d
3145    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
3146    /// exercises the real file-based wrapper end-to-end via the protocol.
3147    #[allow(dead_code)]
3148    #[cfg(any())] // disabled on all targets
3149    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3150}