Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex, OnceLock};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
22    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
23    PersistedTask, TaskPaths,
24};
25use super::process::is_process_alive;
26#[cfg(unix)]
27use super::process::terminate_pgid;
28#[cfg(windows)]
29use super::process::terminate_pid;
30use super::{BgTaskInfo, BgTaskStatus};
31// Note: `resolve_windows_shell` is no longer imported at module scope —
32// production code in `spawn_detached_child` uses `shell_candidates()`
33// with retry instead, and the function remains in `windows_shell.rs`
34// for tests and as a future helper.
35
36/// Default timeout for background bash tasks: 30 minutes.
37/// Agents can override per-call via the `timeout` parameter (in ms).
38const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
39const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
40const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
41const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
42
43/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
44/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
45/// of typical command output (git status, test results, exit messages) — short
46/// enough that round-tripping multiple completions in one reminder stays well
47/// under the model's context budget but long enough that most successful runs
48/// don't need a follow-up `bash_status` call.
49const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
50
51#[derive(Debug, Clone, Serialize)]
52pub struct BgCompletion {
53    pub task_id: String,
54    /// Intentionally omitted from serialized completion payloads: push frames
55    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
56    #[serde(skip_serializing)]
57    pub session_id: String,
58    pub status: BgTaskStatus,
59    pub exit_code: Option<i32>,
60    pub command: String,
61    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
62    /// cached so push-frame consumers and `bash_drain_completions` callers see
63    /// the same preview without racing against later output rotation. Empty
64    /// when not captured (e.g., persisted task seen on startup before buffer
65    /// reattachment).
66    #[serde(default, skip_serializing_if = "String::is_empty")]
67    pub output_preview: String,
68    /// True when the captured tail is shorter than the actual output (because
69    /// rotation occurred or the output exceeds the preview cap). Plugins use
70    /// this to render a `…` prefix and signal that `bash_status` would return
71    /// more.
72    #[serde(default, skip_serializing_if = "is_false")]
73    pub output_truncated: bool,
74}
75
76fn is_false(v: &bool) -> bool {
77    !*v
78}
79
80#[derive(Debug, Clone, Serialize)]
81pub struct BgTaskSnapshot {
82    #[serde(flatten)]
83    pub info: BgTaskInfo,
84    pub exit_code: Option<i32>,
85    pub child_pid: Option<u32>,
86    pub workdir: String,
87    pub output_preview: String,
88    pub output_truncated: bool,
89    pub output_path: Option<String>,
90    pub stderr_path: Option<String>,
91}
92
93#[derive(Clone)]
94pub struct BgTaskRegistry {
95    pub(crate) inner: Arc<RegistryInner>,
96}
97
98pub(crate) struct RegistryInner {
99    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
100    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
101    pub(crate) progress_sender: SharedProgressSender,
102    watchdog_started: AtomicBool,
103    pub(crate) shutdown: AtomicBool,
104    pub(crate) long_running_reminder_enabled: AtomicBool,
105    pub(crate) long_running_reminder_interval_ms: AtomicU64,
106    persisted_gc_started: AtomicBool,
107    #[cfg(test)]
108    persisted_gc_runs: AtomicU64,
109    /// Output compression callback. Set by `AppContext` after construction.
110    /// Takes (command, raw_output) and returns compressed text. Called from
111    /// the watchdog thread when a task reaches a terminal state and from
112    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
113    /// uncompressed.
114    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
115}
116
117pub(crate) struct BgTask {
118    pub(crate) task_id: String,
119    pub(crate) session_id: String,
120    pub(crate) paths: TaskPaths,
121    pub(crate) started: Instant,
122    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
123    pub(crate) terminal_at: Mutex<Option<Instant>>,
124    pub(crate) state: Mutex<BgTaskState>,
125}
126
127pub(crate) struct BgTaskState {
128    pub(crate) metadata: PersistedTask,
129    pub(crate) child: Option<Child>,
130    pub(crate) detached: bool,
131    pub(crate) buffer: BgBuffer,
132}
133
134impl BgTaskRegistry {
135    pub fn new(progress_sender: SharedProgressSender) -> Self {
136        Self {
137            inner: Arc::new(RegistryInner {
138                tasks: Mutex::new(HashMap::new()),
139                completions: Mutex::new(VecDeque::new()),
140                progress_sender,
141                watchdog_started: AtomicBool::new(false),
142                shutdown: AtomicBool::new(false),
143                long_running_reminder_enabled: AtomicBool::new(true),
144                long_running_reminder_interval_ms: AtomicU64::new(600_000),
145                persisted_gc_started: AtomicBool::new(false),
146                #[cfg(test)]
147                persisted_gc_runs: AtomicU64::new(0),
148                compressor: Mutex::new(None),
149            }),
150        }
151    }
152
153    /// Install the output-compression callback. Called by `main.rs` after
154    /// `AppContext` is constructed so that snapshot/completion paths can
155    /// invoke `compress::compress_with_registry` without holding a context
156    /// reference. When called multiple times, the latest installation wins.
157    pub fn set_compressor<F>(&self, compressor: F)
158    where
159        F: Fn(&str, String) -> String + Send + Sync + 'static,
160    {
161        if let Ok(mut slot) = self.inner.compressor.lock() {
162            *slot = Some(Box::new(compressor));
163        }
164    }
165
166    /// Apply the installed compressor (if any) to `output`. Returns `output`
167    /// untouched when no compressor is installed.
168    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
169        let Ok(slot) = self.inner.compressor.lock() else {
170            return output;
171        };
172        match slot.as_ref() {
173            Some(compressor) => compressor(command, output),
174            None => output,
175        }
176    }
177
178    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
179        self.inner
180            .long_running_reminder_enabled
181            .store(enabled, Ordering::SeqCst);
182        self.inner
183            .long_running_reminder_interval_ms
184            .store(interval_ms, Ordering::SeqCst);
185    }
186
187    #[cfg(unix)]
188    #[allow(clippy::too_many_arguments)]
189    pub fn spawn(
190        &self,
191        command: &str,
192        session_id: String,
193        workdir: PathBuf,
194        env: HashMap<String, String>,
195        timeout: Option<Duration>,
196        storage_dir: PathBuf,
197        max_running: usize,
198        notify_on_completion: bool,
199        compressed: bool,
200        project_root: Option<PathBuf>,
201    ) -> Result<String, String> {
202        self.start_watchdog();
203
204        let running = self.running_count();
205        if running >= max_running {
206            return Err(format!(
207                "background bash task limit exceeded: {running} running (max {max_running})"
208            ));
209        }
210
211        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
212        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
213        let task_id = self.generate_unique_task_id()?;
214        let paths = task_paths(&storage_dir, &session_id, &task_id);
215        fs::create_dir_all(&paths.dir)
216            .map_err(|e| format!("failed to create background task dir: {e}"))?;
217
218        let mut metadata = PersistedTask::starting(
219            task_id.clone(),
220            session_id.clone(),
221            command.to_string(),
222            workdir.clone(),
223            project_root,
224            timeout_ms,
225            notify_on_completion,
226            compressed,
227        );
228        write_task(&paths.json, &metadata)
229            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
230
231        // Pre-create capture files so the watchdog/buffer can always
232        // open them for reading. The spawn helper opens its own handles
233        // per attempt because each `Command::spawn()` consumes them.
234        create_capture_file(&paths.stdout)
235            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
236        create_capture_file(&paths.stderr)
237            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
238
239        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
240            Ok(child) => child,
241            Err(error) => {
242                log::warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
243                let _ = delete_task_bundle(&paths);
244                return Err(error);
245            }
246        };
247
248        let child_pid = child.id();
249        metadata.mark_running(child_pid, child_pid as i32);
250        write_task(&paths.json, &metadata)
251            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
252
253        let task = Arc::new(BgTask {
254            task_id: task_id.clone(),
255            session_id,
256            paths: paths.clone(),
257            started: Instant::now(),
258            last_reminder_at: Mutex::new(None),
259            terminal_at: Mutex::new(None),
260            state: Mutex::new(BgTaskState {
261                metadata,
262                child: Some(child),
263                detached: false,
264                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
265            }),
266        });
267
268        self.inner
269            .tasks
270            .lock()
271            .map_err(|_| "background task registry lock poisoned".to_string())?
272            .insert(task_id.clone(), task);
273
274        Ok(task_id)
275    }
276
277    #[cfg(windows)]
278    #[allow(clippy::too_many_arguments)]
279    pub fn spawn(
280        &self,
281        command: &str,
282        session_id: String,
283        workdir: PathBuf,
284        env: HashMap<String, String>,
285        timeout: Option<Duration>,
286        storage_dir: PathBuf,
287        max_running: usize,
288        notify_on_completion: bool,
289        compressed: bool,
290        project_root: Option<PathBuf>,
291    ) -> Result<String, String> {
292        self.start_watchdog();
293
294        let running = self.running_count();
295        if running >= max_running {
296            return Err(format!(
297                "background bash task limit exceeded: {running} running (max {max_running})"
298            ));
299        }
300
301        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
302        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
303        let task_id = self.generate_unique_task_id()?;
304        let paths = task_paths(&storage_dir, &session_id, &task_id);
305        fs::create_dir_all(&paths.dir)
306            .map_err(|e| format!("failed to create background task dir: {e}"))?;
307
308        let mut metadata = PersistedTask::starting(
309            task_id.clone(),
310            session_id.clone(),
311            command.to_string(),
312            workdir.clone(),
313            project_root,
314            timeout_ms,
315            notify_on_completion,
316            compressed,
317        );
318        write_task(&paths.json, &metadata)
319            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
320
321        // Capture files are pre-created so the watchdog/buffer can always
322        // open them for reading even if the child hasn't written anything
323        // yet. The spawn helper opens its own handles per attempt because
324        // each `Command::spawn()` consumes them, and on Windows we may
325        // retry across multiple shell candidates if the first one fails.
326        create_capture_file(&paths.stdout)
327            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
328        create_capture_file(&paths.stderr)
329            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
330
331        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
332            Ok(child) => child,
333            Err(error) => {
334                log::warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
335                let _ = delete_task_bundle(&paths);
336                return Err(error);
337            }
338        };
339
340        let child_pid = child.id();
341        metadata.status = BgTaskStatus::Running;
342        metadata.child_pid = Some(child_pid);
343        metadata.pgid = None;
344        write_task(&paths.json, &metadata)
345            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
346
347        let task = Arc::new(BgTask {
348            task_id: task_id.clone(),
349            session_id,
350            paths: paths.clone(),
351            started: Instant::now(),
352            last_reminder_at: Mutex::new(None),
353            terminal_at: Mutex::new(None),
354            state: Mutex::new(BgTaskState {
355                metadata,
356                child: Some(child),
357                detached: false,
358                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
359            }),
360        });
361
362        self.inner
363            .tasks
364            .lock()
365            .map_err(|_| "background task registry lock poisoned".to_string())?
366            .insert(task_id.clone(), task);
367
368        Ok(task_id)
369    }
370
371    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
372        self.start_watchdog();
373        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
374            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
375                log::warn!("failed to GC persisted background bash tasks: {error}");
376            }
377        }
378        let dir = session_tasks_dir(storage_dir, session_id);
379        if !dir.exists() {
380            return Ok(());
381        }
382
383        let entries = fs::read_dir(&dir)
384            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
385        for entry in entries.flatten() {
386            let path = entry.path();
387            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
388                continue;
389            }
390            let Ok(mut metadata) = read_task(&path) else {
391                continue;
392            };
393            if metadata.session_id != session_id {
394                continue;
395            }
396
397            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
398            match metadata.status {
399                BgTaskStatus::Starting => {
400                    metadata.mark_terminal(
401                        BgTaskStatus::Failed,
402                        None,
403                        Some("spawn aborted".to_string()),
404                    );
405                    let _ = write_task(&paths.json, &metadata);
406                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
407                }
408                BgTaskStatus::Running | BgTaskStatus::Killing => {
409                    if self.running_metadata_is_stale(&metadata) {
410                        metadata.mark_terminal(
411                            BgTaskStatus::Killed,
412                            None,
413                            Some("orphaned (>24h)".to_string()),
414                        );
415                        if !paths.exit.exists() {
416                            let _ = write_kill_marker_if_absent(&paths.exit);
417                        }
418                        let _ = write_task(&paths.json, &metadata);
419                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
420                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
421                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
422                            "recovered from inconsistent killing state on replay".to_string()
423                        });
424                        if reason.is_some() {
425                            log::warn!(
426                                "background task {} had killing state with exit marker; preferring marker",
427                                metadata.task_id
428                            );
429                        }
430                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
431                        let _ = write_task(&paths.json, &metadata);
432                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
433                    } else if metadata.status == BgTaskStatus::Killing {
434                        if !paths.exit.exists() {
435                            let _ = write_kill_marker_if_absent(&paths.exit);
436                        }
437                        metadata.mark_terminal(
438                            BgTaskStatus::Killed,
439                            None,
440                            Some("recovered from inconsistent killing state on replay".to_string()),
441                        );
442                        let _ = write_task(&paths.json, &metadata);
443                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
444                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
445                        metadata.mark_terminal(
446                            BgTaskStatus::Failed,
447                            None,
448                            Some("process exited without exit marker".to_string()),
449                        );
450                        let _ = write_task(&paths.json, &metadata);
451                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
452                    } else {
453                        self.insert_rehydrated_task(metadata, paths, true)?;
454                    }
455                }
456                _ if metadata.status.is_terminal() => {
457                    // Borrow `paths` for the completion enqueue BEFORE
458                    // `insert_rehydrated_task` consumes it. The completion
459                    // helper only reads from `paths` (stdout/stderr/exit) to
460                    // reconstruct a tail preview, so it must see the same
461                    // paths the rehydrated task will own.
462                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
463                    self.insert_rehydrated_task(metadata, paths, true)?;
464                }
465                _ => {}
466            }
467        }
468
469        Ok(())
470    }
471
472    pub fn status(
473        &self,
474        task_id: &str,
475        session_id: &str,
476        project_root: Option<&Path>,
477        storage_dir: Option<&Path>,
478        preview_bytes: usize,
479    ) -> Option<BgTaskSnapshot> {
480        let mut task = self.task_for_session(task_id, session_id);
481        if task.is_none() {
482            if let Some(storage_dir) = storage_dir {
483                let _ = self.replay_session(storage_dir, session_id);
484                task = self.task_for_session(task_id, session_id);
485            }
486        }
487        let Some(task) = task else {
488            return self.status_relaxed(
489                task_id,
490                session_id,
491                project_root?,
492                storage_dir?,
493                preview_bytes,
494            );
495        };
496        let _ = self.poll_task(&task);
497        let mut snapshot = task.snapshot(preview_bytes);
498        self.maybe_compress_snapshot(&task, &mut snapshot);
499        Some(snapshot)
500    }
501
502    fn status_relaxed_task(
503        &self,
504        task_id: &str,
505        project_root: &Path,
506        storage_dir: &Path,
507    ) -> Option<Arc<BgTask>> {
508        let canonical_project = canonicalized_path(project_root);
509        let root = storage_dir.join("bash-tasks");
510        let entries = fs::read_dir(&root).ok()?;
511        for entry in entries.flatten() {
512            let dir = entry.path();
513            if !dir.is_dir() {
514                continue;
515            }
516            let path = dir.join(format!("{task_id}.json"));
517            if !path.exists() {
518                continue;
519            }
520            let Ok(metadata) = read_task(&path) else {
521                continue;
522            };
523            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
524            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
525                continue;
526            }
527            if let Some(task) = self.task(task_id) {
528                let matches_project = task
529                    .state
530                    .lock()
531                    .map(|state| {
532                        state
533                            .metadata
534                            .project_root
535                            .as_deref()
536                            .map(canonicalized_path)
537                            .as_deref()
538                            == Some(canonical_project.as_path())
539                    })
540                    .unwrap_or(false);
541                return matches_project.then_some(task);
542            }
543            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
544            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
545                return None;
546            }
547            return self.task(task_id);
548        }
549        None
550    }
551
552    pub(super) fn status_relaxed(
553        &self,
554        task_id: &str,
555        _session_id: &str,
556        project_root: &Path,
557        storage_dir: &Path,
558        preview_bytes: usize,
559    ) -> Option<BgTaskSnapshot> {
560        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
561        let _ = self.poll_task(&task);
562        let mut snapshot = task.snapshot(preview_bytes);
563        self.maybe_compress_snapshot(&task, &mut snapshot);
564        Some(snapshot)
565    }
566
567    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
568        #[cfg(test)]
569        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
570
571        let mut deleted = 0usize;
572
573        let root = storage_dir.join("bash-tasks");
574        if root.exists() {
575            let session_dirs = fs::read_dir(&root).map_err(|e| {
576                format!(
577                    "failed to read background task root {}: {e}",
578                    root.display()
579                )
580            })?;
581            for session_entry in session_dirs.flatten() {
582                let session_dir = session_entry.path();
583                if !session_dir.is_dir() {
584                    continue;
585                }
586                let task_entries = match fs::read_dir(&session_dir) {
587                    Ok(entries) => entries,
588                    Err(error) => {
589                        log::warn!(
590                            "failed to read background task session dir {}: {error}",
591                            session_dir.display()
592                        );
593                        continue;
594                    }
595                };
596                for task_entry in task_entries.flatten() {
597                    let json_path = task_entry.path();
598                    if json_path
599                        .extension()
600                        .and_then(|extension| extension.to_str())
601                        != Some("json")
602                    {
603                        continue;
604                    }
605                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
606                        continue;
607                    }
608                    let metadata = match read_task(&json_path) {
609                        Ok(metadata) => metadata,
610                        Err(error) => {
611                            log::warn!(
612                                "quarantining corrupt background task metadata {}: {error}",
613                                json_path.display()
614                            );
615                            quarantine_corrupt_task_json(storage_dir, &session_dir, &json_path)?;
616                            continue;
617                        }
618                    };
619                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
620                        continue;
621                    }
622                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
623                    match delete_task_bundle(&paths) {
624                        Ok(()) => {
625                            deleted += 1;
626                            log::debug!(
627                                "deleted persisted background task bundle {}",
628                                metadata.task_id
629                            );
630                        }
631                        Err(error) => {
632                            log::warn!(
633                                "failed to delete background task bundle {}: {error}",
634                                metadata.task_id
635                            );
636                            continue;
637                        }
638                    }
639                }
640            }
641        }
642        gc_quarantine(storage_dir);
643        Ok(deleted)
644    }
645
646    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
647        let tasks = self
648            .inner
649            .tasks
650            .lock()
651            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
652            .unwrap_or_default();
653        tasks
654            .into_iter()
655            .map(|task| {
656                let _ = self.poll_task(&task);
657                let mut snapshot = task.snapshot(preview_bytes);
658                self.maybe_compress_snapshot(&task, &mut snapshot);
659                snapshot
660            })
661            .collect()
662    }
663
664    /// Compress `output_preview` in place when the task is in a terminal
665    /// state. Live tail of running tasks stays raw so agents debugging
666    /// long-running bash see exactly what the process emitted, not a
667    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
668    /// field on `PersistedTask` short-circuits before the compress pipeline.
669    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
670        if !snapshot.info.status.is_terminal() {
671            return;
672        }
673        let compressed_flag = task
674            .state
675            .lock()
676            .map(|state| state.metadata.compressed)
677            .unwrap_or(true);
678        if !compressed_flag {
679            return;
680        }
681        let raw = std::mem::take(&mut snapshot.output_preview);
682        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
683    }
684
685    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
686        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
687    }
688
689    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
690        let task = self
691            .task_for_session(task_id, session_id)
692            .ok_or_else(|| format!("background task not found: {task_id}"))?;
693        let mut state = task
694            .state
695            .lock()
696            .map_err(|_| "background task lock poisoned".to_string())?;
697        let updated = update_task(&task.paths.json, |metadata| {
698            metadata.notify_on_completion = true;
699            metadata.completion_delivered = false;
700        })
701        .map_err(|e| format!("failed to promote background task: {e}"))?;
702        state.metadata = updated;
703        if state.metadata.status.is_terminal() {
704            state.buffer.enforce_terminal_cap();
705            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
706        }
707        Ok(true)
708    }
709
710    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
711        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
712            .map(|_| ())
713    }
714
715    pub fn cleanup_finished(&self, older_than: Duration) {
716        let cutoff = Instant::now().checked_sub(older_than);
717        let removable_paths: Vec<(String, TaskPaths)> =
718            if let Ok(mut tasks) = self.inner.tasks.lock() {
719                let removable = tasks
720                    .iter()
721                    .filter_map(|(task_id, task)| {
722                        let delivered_terminal = task
723                            .state
724                            .lock()
725                            .map(|state| {
726                                state.metadata.status.is_terminal()
727                                    && state.metadata.completion_delivered
728                            })
729                            .unwrap_or(false);
730                        if !delivered_terminal {
731                            return None;
732                        }
733
734                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
735                        let expired = match (terminal_at, cutoff) {
736                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
737                            (Some(_), None) => true,
738                            (None, _) => false,
739                        };
740                        expired.then(|| task_id.clone())
741                    })
742                    .collect::<Vec<_>>();
743
744                removable
745                    .into_iter()
746                    .filter_map(|task_id| {
747                        tasks
748                            .remove(&task_id)
749                            .map(|task| (task_id, task.paths.clone()))
750                    })
751                    .collect()
752            } else {
753                Vec::new()
754            };
755
756        for (task_id, paths) in removable_paths {
757            match delete_task_bundle(&paths) {
758                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
759                Err(error) => log::warn!(
760                    "failed to delete persisted background task bundle {task_id}: {error}"
761                ),
762            }
763        }
764    }
765
766    pub fn drain_completions(&self) -> Vec<BgCompletion> {
767        self.drain_completions_for_session(None)
768    }
769
770    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
771        let mut completions = match self.inner.completions.lock() {
772            Ok(completions) => completions,
773            Err(_) => return Vec::new(),
774        };
775
776        let drained = if let Some(session_id) = session_id {
777            let mut matched = Vec::new();
778            let mut retained = VecDeque::new();
779            while let Some(completion) = completions.pop_front() {
780                if completion.session_id == session_id {
781                    matched.push(completion);
782                } else {
783                    retained.push_back(completion);
784                }
785            }
786            *completions = retained;
787            matched
788        } else {
789            completions.drain(..).collect()
790        };
791        drop(completions);
792
793        for completion in &drained {
794            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
795                let _ = task.set_completion_delivered(true);
796            }
797        }
798
799        drained
800    }
801
802    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
803        self.inner
804            .completions
805            .lock()
806            .map(|completions| {
807                completions
808                    .iter()
809                    .filter(|completion| completion.session_id == session_id)
810                    .cloned()
811                    .collect()
812            })
813            .unwrap_or_default()
814    }
815
816    pub fn detach(&self) {
817        self.inner.shutdown.store(true, Ordering::SeqCst);
818        if let Ok(mut tasks) = self.inner.tasks.lock() {
819            for task in tasks.values() {
820                if let Ok(mut state) = task.state.lock() {
821                    state.child = None;
822                    state.detached = true;
823                }
824            }
825            tasks.clear();
826        }
827    }
828
829    pub fn shutdown(&self) {
830        let tasks = self
831            .inner
832            .tasks
833            .lock()
834            .map(|tasks| {
835                tasks
836                    .values()
837                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
838                    .collect::<Vec<_>>()
839            })
840            .unwrap_or_default();
841        for (task_id, session_id) in tasks {
842            let _ = self.kill(&task_id, &session_id);
843        }
844    }
845
846    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
847        let marker = match read_exit_marker(&task.paths.exit) {
848            Ok(Some(marker)) => marker,
849            Ok(None) => return Ok(()),
850            Err(error) => return Err(format!("failed to read exit marker: {error}")),
851        };
852        self.finalize_from_marker(task, marker, None)
853    }
854
855    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
856        let Ok(mut state) = task.state.lock() else {
857            return;
858        };
859        if let Some(child) = state.child.as_mut() {
860            if matches!(child.try_wait(), Ok(Some(_))) {
861                // Child has exited. If the wrapper successfully wrote an
862                // exit marker, the next `poll_task()` cycle will pick it up
863                // and finalize via `finalize_from_marker`. But if the
864                // wrapper crashed before writing the marker (e.g. SIGKILL,
865                // power loss, wrapper bug), the task would forever appear
866                // Running until `timeout_ms` expired — and if no timeout
867                // was set, until the 24h `running_metadata_is_stale` cutoff
868                // hit at the next aft restart. Same condition as the replay
869                // path's "PID dead but no marker" branch (see line 338).
870                //
871                // To avoid that hidden hang, mark the task Failed
872                // immediately with the same reason string used by replay,
873                // but only if the marker is genuinely absent. If a marker
874                // appeared on disk between try_wait() returning and now
875                // (race window), prefer the marker — let the next poll
876                // cycle finalize from it.
877                state.child = None;
878                state.detached = true;
879                if state.metadata.status.is_terminal() {
880                    return;
881                }
882                if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
883                    return;
884                }
885                let updated = update_task(&task.paths.json, |metadata| {
886                    metadata.mark_terminal(
887                        BgTaskStatus::Failed,
888                        None,
889                        Some("process exited without exit marker".to_string()),
890                    );
891                });
892                if let Ok(metadata) = updated {
893                    state.metadata = metadata;
894                    task.mark_terminal_now();
895                    state.buffer.enforce_terminal_cap();
896                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
897                }
898            }
899        }
900    }
901
902    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
903        self.inner
904            .tasks
905            .lock()
906            .map(|tasks| {
907                tasks
908                    .values()
909                    .filter(|task| task.is_running())
910                    .cloned()
911                    .collect()
912            })
913            .unwrap_or_default()
914    }
915
916    fn insert_rehydrated_task(
917        &self,
918        metadata: PersistedTask,
919        paths: TaskPaths,
920        detached: bool,
921    ) -> Result<(), String> {
922        let task_id = metadata.task_id.clone();
923        let session_id = metadata.session_id.clone();
924        let started = started_instant_from_unix_millis(metadata.started_at);
925        let task = Arc::new(BgTask {
926            task_id: task_id.clone(),
927            session_id,
928            paths: paths.clone(),
929            started,
930            last_reminder_at: Mutex::new(None),
931            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
932            state: Mutex::new(BgTaskState {
933                metadata,
934                child: None,
935                detached,
936                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
937            }),
938        });
939        self.inner
940            .tasks
941            .lock()
942            .map_err(|_| "background task registry lock poisoned".to_string())?
943            .insert(task_id, task);
944        Ok(())
945    }
946
947    fn kill_with_status(
948        &self,
949        task_id: &str,
950        session_id: &str,
951        terminal_status: BgTaskStatus,
952    ) -> Result<BgTaskSnapshot, String> {
953        let task = self
954            .task_for_session(task_id, session_id)
955            .ok_or_else(|| format!("background task not found: {task_id}"))?;
956
957        {
958            let mut state = task
959                .state
960                .lock()
961                .map_err(|_| "background task lock poisoned".to_string())?;
962            if state.metadata.status.is_terminal() {
963                return Ok(task.snapshot_locked(&state, 5 * 1024));
964            }
965
966            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
967                state.metadata =
968                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
969                task.mark_terminal_now();
970                state.child = None;
971                state.detached = true;
972                state.buffer.enforce_terminal_cap();
973                write_task(&task.paths.json, &state.metadata)
974                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
975                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
976                return Ok(task.snapshot_locked(&state, 5 * 1024));
977            }
978
979            state.metadata.status = BgTaskStatus::Killing;
980            write_task(&task.paths.json, &state.metadata)
981                .map_err(|e| format!("failed to persist killing state: {e}"))?;
982
983            #[cfg(unix)]
984            if let Some(pgid) = state.metadata.pgid {
985                terminate_pgid(pgid, state.child.as_mut());
986            }
987            #[cfg(windows)]
988            if let Some(child) = state.child.as_mut() {
989                super::process::terminate_process(child);
990            } else if let Some(pid) = state.metadata.child_pid {
991                terminate_pid(pid);
992            }
993            if let Some(child) = state.child.as_mut() {
994                let _ = child.wait();
995            }
996            state.child = None;
997            state.detached = true;
998
999            if !task.paths.exit.exists() {
1000                write_kill_marker_if_absent(&task.paths.exit)
1001                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
1002            }
1003
1004            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1005                Some(124)
1006            } else {
1007                None
1008            };
1009            state
1010                .metadata
1011                .mark_terminal(terminal_status, exit_code, None);
1012            task.mark_terminal_now();
1013            write_task(&task.paths.json, &state.metadata)
1014                .map_err(|e| format!("failed to persist killed state: {e}"))?;
1015            state.buffer.enforce_terminal_cap();
1016            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1017        }
1018
1019        Ok(task.snapshot(5 * 1024))
1020    }
1021
1022    fn finalize_from_marker(
1023        &self,
1024        task: &Arc<BgTask>,
1025        marker: ExitMarker,
1026        reason: Option<String>,
1027    ) -> Result<(), String> {
1028        let mut state = task
1029            .state
1030            .lock()
1031            .map_err(|_| "background task lock poisoned".to_string())?;
1032        if state.metadata.status.is_terminal() {
1033            return Ok(());
1034        }
1035
1036        let updated = update_task(&task.paths.json, |metadata| {
1037            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1038            *metadata = new_metadata;
1039        })
1040        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1041        state.metadata = updated;
1042        task.mark_terminal_now();
1043        state.child = None;
1044        state.detached = true;
1045        state.buffer.enforce_terminal_cap();
1046        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1047        Ok(())
1048    }
1049
1050    fn enqueue_completion_if_needed(
1051        &self,
1052        metadata: &PersistedTask,
1053        paths: Option<&TaskPaths>,
1054        emit_frame: bool,
1055    ) {
1056        if metadata.status.is_terminal() && !metadata.completion_delivered {
1057            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1058        }
1059    }
1060
1061    fn enqueue_completion_locked(
1062        &self,
1063        metadata: &PersistedTask,
1064        buffer: Option<&BgBuffer>,
1065        emit_frame: bool,
1066    ) {
1067        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1068    }
1069
1070    fn enqueue_completion_from_parts(
1071        &self,
1072        metadata: &PersistedTask,
1073        buffer: Option<&BgBuffer>,
1074        paths: Option<&TaskPaths>,
1075        emit_frame: bool,
1076    ) {
1077        if !metadata.status.is_terminal() || metadata.completion_delivered {
1078            return;
1079        }
1080        // Read tail once at completion time and cache on the BgCompletion so
1081        // both the push-frame consumer (running session) and any later
1082        // `bash_drain_completions` poll (different session, restart) see the
1083        // same preview without racing against rotation.
1084        let (raw_preview, output_truncated) = match buffer {
1085            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1086            None => paths
1087                .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1088                .unwrap_or_else(|| (String::new(), false)),
1089        };
1090        // Compress at completion time so push-frame consumers and later
1091        // `bash_drain_completions` poll-callers see the same compressed text.
1092        // Per-task `compressed: false` opts out; otherwise the compressor is
1093        // a no-op when `experimental.bash.compress=false`.
1094        let output_preview = if metadata.compressed {
1095            self.compress_output(&metadata.command, raw_preview)
1096        } else {
1097            raw_preview
1098        };
1099        let completion = BgCompletion {
1100            task_id: metadata.task_id.clone(),
1101            session_id: metadata.session_id.clone(),
1102            status: metadata.status.clone(),
1103            exit_code: metadata.exit_code,
1104            command: metadata.command.clone(),
1105            output_preview,
1106            output_truncated,
1107        };
1108        if let Ok(mut completions) = self.inner.completions.lock() {
1109            if completions
1110                .iter()
1111                .any(|completion| completion.task_id == metadata.task_id)
1112            {
1113                return;
1114            }
1115            completions.push_back(completion.clone());
1116        } else {
1117            return;
1118        }
1119
1120        if emit_frame {
1121            self.emit_bash_completed(completion);
1122        }
1123    }
1124
1125    fn emit_bash_completed(&self, completion: BgCompletion) {
1126        let Ok(progress_sender) = self
1127            .inner
1128            .progress_sender
1129            .lock()
1130            .map(|sender| sender.clone())
1131        else {
1132            return;
1133        };
1134        let Some(sender) = progress_sender.as_ref() else {
1135            return;
1136        };
1137        // Clone the callback out of the registry mutex before writing to stdout;
1138        // otherwise a blocked push-frame write could pin the mutex and starve
1139        // unrelated progress-sender updates.
1140        // Bg task transitions are discovered by the watchdog thread, so the
1141        // sender is shared behind a Mutex. It still uses the same stdout writer
1142        // closure as foreground progress frames, preserving the existing lock/
1143        // flush behavior in main.rs.
1144        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1145            completion.task_id,
1146            completion.session_id,
1147            completion.status,
1148            completion.exit_code,
1149            completion.command,
1150            completion.output_preview,
1151            completion.output_truncated,
1152        )));
1153    }
1154
1155    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1156        if !self
1157            .inner
1158            .long_running_reminder_enabled
1159            .load(Ordering::SeqCst)
1160        {
1161            return;
1162        }
1163        let interval_ms = self
1164            .inner
1165            .long_running_reminder_interval_ms
1166            .load(Ordering::SeqCst);
1167        if interval_ms == 0 {
1168            return;
1169        }
1170        let interval = Duration::from_millis(interval_ms);
1171        let now = Instant::now();
1172        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1173            return;
1174        };
1175        let since = last_reminder_at.unwrap_or(task.started);
1176        if now.duration_since(since) < interval {
1177            return;
1178        }
1179        let command = task
1180            .state
1181            .lock()
1182            .map(|state| state.metadata.command.clone())
1183            .unwrap_or_default();
1184        *last_reminder_at = Some(now);
1185        self.emit_bash_long_running(BashLongRunningFrame::new(
1186            task.task_id.clone(),
1187            task.session_id.clone(),
1188            command,
1189            task.started.elapsed().as_millis() as u64,
1190        ));
1191    }
1192
1193    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1194        let Ok(progress_sender) = self
1195            .inner
1196            .progress_sender
1197            .lock()
1198            .map(|sender| sender.clone())
1199        else {
1200            return;
1201        };
1202        if let Some(sender) = progress_sender.as_ref() {
1203            sender(PushFrame::BashLongRunning(frame));
1204        }
1205    }
1206
1207    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1208        self.inner
1209            .tasks
1210            .lock()
1211            .ok()
1212            .and_then(|tasks| tasks.get(task_id).cloned())
1213    }
1214
1215    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1216        self.task(task_id)
1217            .filter(|task| task.session_id == session_id)
1218    }
1219
1220    fn running_count(&self) -> usize {
1221        self.inner
1222            .tasks
1223            .lock()
1224            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1225            .unwrap_or(0)
1226    }
1227
1228    fn start_watchdog(&self) {
1229        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1230            super::watchdog::start(self.clone());
1231        }
1232    }
1233
1234    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1235        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1236    }
1237
1238    #[cfg(test)]
1239    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1240        self.task_for_session(task_id, session_id)
1241            .map(|task| task.paths.json.clone())
1242    }
1243
1244    #[cfg(test)]
1245    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1246        self.task_for_session(task_id, session_id)
1247            .map(|task| task.paths.exit.clone())
1248    }
1249
1250    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
1251    fn generate_unique_task_id(&self) -> Result<String, String> {
1252        for _ in 0..32 {
1253            let candidate = random_slug();
1254            let tasks = self
1255                .inner
1256                .tasks
1257                .lock()
1258                .map_err(|_| "background task registry lock poisoned".to_string())?;
1259            if tasks.contains_key(&candidate) {
1260                continue;
1261            }
1262            let completions = self
1263                .inner
1264                .completions
1265                .lock()
1266                .map_err(|_| "background completions lock poisoned".to_string())?;
1267            if completions
1268                .iter()
1269                .any(|completion| completion.task_id == candidate)
1270            {
1271                continue;
1272            }
1273            return Ok(candidate);
1274        }
1275        Err("failed to allocate unique background task id after 32 attempts".to_string())
1276    }
1277}
1278
1279impl Default for BgTaskRegistry {
1280    fn default() -> Self {
1281        Self::new(Arc::new(Mutex::new(None)))
1282    }
1283}
1284
1285fn modified_within(path: &Path, grace: Duration) -> bool {
1286    fs::metadata(path)
1287        .and_then(|metadata| metadata.modified())
1288        .ok()
1289        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1290        .map(|age| age < grace)
1291        .unwrap_or(false)
1292}
1293
1294fn canonicalized_path(path: &Path) -> PathBuf {
1295    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1296}
1297
1298fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1299    let now_ms = SystemTime::now()
1300        .duration_since(UNIX_EPOCH)
1301        .ok()
1302        .map(|duration| duration.as_millis() as u64)
1303        .unwrap_or(started_at);
1304    let elapsed_ms = now_ms.saturating_sub(started_at);
1305    Instant::now()
1306        .checked_sub(Duration::from_millis(elapsed_ms))
1307        .unwrap_or_else(Instant::now)
1308}
1309
1310fn gc_quarantine(storage_dir: &Path) {
1311    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1312    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1313        return;
1314    };
1315    for session_entry in session_dirs.flatten() {
1316        let session_quarantine_dir = session_entry.path();
1317        if !session_quarantine_dir.is_dir() {
1318            continue;
1319        }
1320        let entries = match fs::read_dir(&session_quarantine_dir) {
1321            Ok(entries) => entries,
1322            Err(error) => {
1323                log::warn!(
1324                    "failed to read background task quarantine dir {}: {error}",
1325                    session_quarantine_dir.display()
1326                );
1327                continue;
1328            }
1329        };
1330        for entry in entries.flatten() {
1331            let path = entry.path();
1332            if modified_within(&path, QUARANTINE_GC_GRACE) {
1333                continue;
1334            }
1335            let result = if path.is_dir() {
1336                fs::remove_dir_all(&path)
1337            } else {
1338                fs::remove_file(&path)
1339            };
1340            match result {
1341                Ok(()) => log::debug!(
1342                    "deleted old background task quarantine entry {}",
1343                    path.display()
1344                ),
1345                Err(error) => log::warn!(
1346                    "failed to delete old background task quarantine entry {}: {error}",
1347                    path.display()
1348                ),
1349            }
1350        }
1351        let _ = fs::remove_dir(&session_quarantine_dir);
1352    }
1353    let _ = fs::remove_dir(&quarantine_root);
1354}
1355
1356fn quarantine_corrupt_task_json(
1357    storage_dir: &Path,
1358    session_dir: &Path,
1359    json_path: &Path,
1360) -> Result<(), String> {
1361    let session_hash = session_dir
1362        .file_name()
1363        .and_then(|name| name.to_str())
1364        .ok_or_else(|| {
1365            format!(
1366                "invalid background task session dir: {}",
1367                session_dir.display()
1368            )
1369        })?;
1370    let task_name = json_path
1371        .file_name()
1372        .and_then(|name| name.to_str())
1373        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1374    let unix_ts = SystemTime::now()
1375        .duration_since(UNIX_EPOCH)
1376        .map(|duration| duration.as_secs())
1377        .unwrap_or(0);
1378    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1379    fs::create_dir_all(&quarantine_dir).map_err(|e| {
1380        format!(
1381            "failed to create background task quarantine dir {}: {e}",
1382            quarantine_dir.display()
1383        )
1384    })?;
1385    let target = quarantine_dir.join(format!("{task_name}.corrupt-{unix_ts}"));
1386    fs::rename(json_path, &target).map_err(|e| {
1387        format!(
1388            "failed to quarantine corrupt background task metadata {} to {}: {e}",
1389            json_path.display(),
1390            target.display()
1391        )
1392    })?;
1393
1394    for sibling in task_sibling_paths(json_path) {
1395        if !sibling.exists() {
1396            continue;
1397        }
1398        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1399            log::warn!(
1400                "skipping background task sibling with invalid name during quarantine: {}",
1401                sibling.display()
1402            );
1403            continue;
1404        };
1405        let sibling_target = quarantine_dir.join(format!("{sibling_name}.corrupt-{unix_ts}"));
1406        if let Err(error) = fs::rename(&sibling, &sibling_target) {
1407            log::warn!(
1408                "failed to quarantine background task sibling {} to {}: {error}",
1409                sibling.display(),
1410                sibling_target.display()
1411            );
1412        }
1413    }
1414
1415    let _ = fs::remove_dir(session_dir);
1416    Ok(())
1417}
1418
1419fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
1420    let Some(parent) = json_path.parent() else {
1421        return Vec::new();
1422    };
1423    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
1424        return Vec::new();
1425    };
1426    ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
1427        .into_iter()
1428        .map(|extension| parent.join(format!("{stem}.{extension}")))
1429        .collect()
1430}
1431
1432fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
1433    let stdout = fs::read(&paths.stdout).unwrap_or_default();
1434    let stderr = fs::read(&paths.stderr).unwrap_or_default();
1435    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
1436    bytes.extend_from_slice(&stdout);
1437    bytes.extend_from_slice(&stderr);
1438    if bytes.len() <= max_bytes {
1439        return (String::from_utf8_lossy(&bytes).into_owned(), false);
1440    }
1441    let start = bytes.len().saturating_sub(max_bytes);
1442    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
1443}
1444
1445impl BgTask {
1446    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1447        let state = self
1448            .state
1449            .lock()
1450            .unwrap_or_else(|poison| poison.into_inner());
1451        self.snapshot_locked(&state, preview_bytes)
1452    }
1453
1454    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1455        let metadata = &state.metadata;
1456        let duration_ms = metadata.duration_ms.or_else(|| {
1457            metadata
1458                .status
1459                .is_terminal()
1460                .then(|| self.started.elapsed().as_millis() as u64)
1461        });
1462        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1463        BgTaskSnapshot {
1464            info: BgTaskInfo {
1465                task_id: self.task_id.clone(),
1466                status: metadata.status.clone(),
1467                command: metadata.command.clone(),
1468                started_at: metadata.started_at,
1469                duration_ms,
1470            },
1471            exit_code: metadata.exit_code,
1472            child_pid: metadata.child_pid,
1473            workdir: metadata.workdir.display().to_string(),
1474            output_preview,
1475            output_truncated,
1476            output_path: state
1477                .buffer
1478                .output_path()
1479                .map(|path| path.display().to_string()),
1480            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1481        }
1482    }
1483
1484    pub(crate) fn is_running(&self) -> bool {
1485        self.state
1486            .lock()
1487            .map(|state| state.metadata.status == BgTaskStatus::Running)
1488            .unwrap_or(false)
1489    }
1490
1491    fn mark_terminal_now(&self) {
1492        if let Ok(mut terminal_at) = self.terminal_at.lock() {
1493            if terminal_at.is_none() {
1494                *terminal_at = Some(Instant::now());
1495            }
1496        }
1497    }
1498
1499    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1500        let mut state = self
1501            .state
1502            .lock()
1503            .map_err(|_| "background task lock poisoned".to_string())?;
1504        let updated = update_task(&self.paths.json, |metadata| {
1505            metadata.completion_delivered = delivered;
1506        })
1507        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1508        state.metadata = updated;
1509        Ok(())
1510    }
1511}
1512
1513fn terminal_metadata_from_marker(
1514    mut metadata: PersistedTask,
1515    marker: ExitMarker,
1516    reason: Option<String>,
1517) -> PersistedTask {
1518    match marker {
1519        ExitMarker::Code(code) => {
1520            let status = if code == 0 {
1521                BgTaskStatus::Completed
1522            } else {
1523                BgTaskStatus::Failed
1524            };
1525            metadata.mark_terminal(status, Some(code), reason);
1526        }
1527        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1528    }
1529    metadata
1530}
1531
1532#[cfg(unix)]
1533fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1534    let shell = resolve_posix_shell();
1535    let mut cmd = Command::new(&shell);
1536    cmd.arg("-c")
1537        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1538        .arg(&shell)
1539        .arg(command)
1540        .arg(exit_path);
1541    unsafe {
1542        cmd.pre_exec(|| {
1543            if libc::setsid() == -1 {
1544                return Err(std::io::Error::last_os_error());
1545            }
1546            Ok(())
1547        });
1548    }
1549    cmd
1550}
1551
1552#[cfg(unix)]
1553fn resolve_posix_shell() -> PathBuf {
1554    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
1555    POSIX_SHELL
1556        .get_or_init(|| {
1557            std::env::var_os("BASH")
1558                .filter(|value| !value.is_empty())
1559                .map(PathBuf::from)
1560                .filter(|path| path.exists())
1561                .or_else(|| which::which("bash").ok())
1562                .or_else(|| which::which("zsh").ok())
1563                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
1564        })
1565        .clone()
1566}
1567
1568#[cfg(windows)]
1569fn detached_shell_command_for(
1570    shell: crate::windows_shell::WindowsShell,
1571    command: &str,
1572    exit_path: &Path,
1573    paths: &TaskPaths,
1574    creation_flags: u32,
1575) -> Result<Command, String> {
1576    use crate::windows_shell::WindowsShell;
1577    // Write the wrapper to a temp file alongside the other task files,
1578    // then invoke the shell with the file path as a single clean
1579    // argument. This sidesteps the entire Windows command-line quoting
1580    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
1581    // parser all interacting with embedded quotes in the wrapper).
1582    //
1583    // Path arguments don't need quoting in the same problematic way
1584    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
1585    // contains no characters that need shell escaping; (2) the wrapper
1586    // body's internal quotes never reach the shell command line — the
1587    // shell reads them from disk by file syntax rules, not command-line
1588    // parser rules.
1589    let wrapper_body = shell.wrapper_script(command, exit_path);
1590    let wrapper_ext = match shell {
1591        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1592        WindowsShell::Cmd => "bat",
1593        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
1594        // so the file extension is purely cosmetic; `.sh` matches what an
1595        // operator would expect when grepping the spill directory.
1596        WindowsShell::Posix(_) => "sh",
1597    };
1598    let wrapper_path = paths.dir.join(format!(
1599        "{}.{}",
1600        paths
1601            .json
1602            .file_stem()
1603            .and_then(|s| s.to_str())
1604            .unwrap_or("wrapper"),
1605        wrapper_ext
1606    ));
1607    fs::write(&wrapper_path, wrapper_body)
1608        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1609
1610    let mut cmd = Command::new(shell.binary().as_ref());
1611    match shell {
1612        WindowsShell::Pwsh | WindowsShell::Powershell => {
1613            // -File runs the script with no quoting issues. `-NoLogo`,
1614            // `-NoProfile`, etc. apply to the host before the file runs.
1615            cmd.args([
1616                "-NoLogo",
1617                "-NoProfile",
1618                "-NonInteractive",
1619                "-ExecutionPolicy",
1620                "Bypass",
1621                "-File",
1622            ]);
1623            cmd.arg(&wrapper_path);
1624        }
1625        WindowsShell::Cmd => {
1626            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
1627            // file via /C is well-defined; the file's contents are
1628            // read line-by-line by cmd's batch processor, NOT
1629            // re-interpreted by the /C parser. This avoids the
1630            // "filename syntax incorrect" errors that came from
1631            // having complex compound commands on the cmd line.
1632            cmd.args(["/D", "/C"]);
1633            cmd.arg(&wrapper_path);
1634        }
1635        WindowsShell::Posix(_) => {
1636            // git-bash and other POSIX shells run the wrapper script with
1637            // `<binary> <wrapper-path>` (the wrapper is just a shell
1638            // script). No special flags needed — the `trap` and atomic
1639            // exit-marker rename in `wrapper_script` are POSIX-standard.
1640            cmd.arg(&wrapper_path);
1641        }
1642    }
1643
1644    // Win32 process creation flags. Caller selects whether to include
1645    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
1646    // for the breakaway-fallback strategy.
1647    cmd.creation_flags(creation_flags);
1648    Ok(cmd)
1649}
1650
1651/// Spawn a detached background bash child process.
1652///
1653/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1654/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1655/// cmd.exe) and retries with the next candidate when the previous one
1656/// fails to spawn with `NotFound` — the same runtime safety net the
1657/// foreground bash path has, so issue #27 callers landing on cmd.exe
1658/// fallback can also use background bash. The wrapper script is
1659/// regenerated per attempt because PowerShell wrappers embed the shell
1660/// binary by name; the stdout/stderr capture handles are also reopened
1661/// per attempt because `Command::spawn()` consumes them.
1662///
1663/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1664/// return immediately without retry — they indicate a problem with the
1665/// resolved shell that retrying with a different shell won't fix.
1666fn spawn_detached_child(
1667    command: &str,
1668    paths: &TaskPaths,
1669    workdir: &Path,
1670    env: &HashMap<String, String>,
1671) -> Result<std::process::Child, String> {
1672    #[cfg(not(windows))]
1673    {
1674        let stdout = create_capture_file(&paths.stdout)
1675            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1676        let stderr = create_capture_file(&paths.stderr)
1677            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1678        detached_shell_command(command, &paths.exit)
1679            .current_dir(workdir)
1680            .envs(env)
1681            .stdin(Stdio::null())
1682            .stdout(Stdio::from(stdout))
1683            .stderr(Stdio::from(stderr))
1684            .spawn()
1685            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1686    }
1687    #[cfg(windows)]
1688    {
1689        use crate::windows_shell::shell_candidates;
1690        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
1691        // legacy foreground bash spawn path. v0.20 routes ALL bash through
1692        // this background spawn helper, including foreground tool calls
1693        // where the model writes PowerShell-syntax (`$var = ...`,
1694        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
1695        // The earlier v0.18-era cmd-first override worked around a
1696        // PowerShell detached-output bug; that bug is fixed at the
1697        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
1698        // see flag block below), so we no longer need to misroute PS
1699        // commands through cmd.
1700        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1701        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
1702        // first (so the bg child outlives the AFT process when AFT is killed),
1703        // then fall back without it for environments where the parent is in a
1704        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
1705        // runners (GitHub Actions windows-2022) and some MDM-managed corp
1706        // environments hit this — `CreateProcess` returns Access Denied (5).
1707        // Without breakaway, the child still runs detached but will be torn
1708        // down with the parent if the parent process group is signaled.
1709        //
1710        // We use CREATE_NO_WINDOW (no visible console window, but the
1711        // child still has a hidden console) rather than DETACHED_PROCESS
1712        // (no console at all). PowerShell-based wrappers that perform
1713        // file I/O via [System.IO.File] need a console handle to flush
1714        // stdout/stderr correctly even when redirected — under
1715        // DETACHED_PROCESS, pwsh sometimes silently exits before
1716        // executing later script statements (the Move-Item that writes
1717        // the exit marker never runs), leaving the bg task forever
1718        // marked Failed: process exited without exit marker. cmd.exe
1719        // wrappers tolerate DETACHED_PROCESS, but switching to
1720        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
1721        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1722        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1723        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1724        let with_breakaway =
1725            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1726        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1727        let mut last_error: Option<String> = None;
1728        for (idx, shell) in candidates.iter().enumerate() {
1729            // Per-shell, try with breakaway first. If the process is in a
1730            // restrictive job, the breakaway flag triggers Access Denied
1731            // (os error 5). Retry once without breakaway.
1732            for &flags in &[with_breakaway, without_breakaway] {
1733                // Re-open capture handles per attempt; spawn() consumes them.
1734                let stdout = create_capture_file(&paths.stdout)
1735                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1736                let stderr = create_capture_file(&paths.stderr)
1737                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1738                let mut cmd =
1739                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1740                cmd.current_dir(workdir)
1741                    .envs(env)
1742                    .stdin(Stdio::null())
1743                    .stdout(Stdio::from(stdout))
1744                    .stderr(Stdio::from(stderr));
1745                match cmd.spawn() {
1746                    Ok(child) => {
1747                        if idx > 0 {
1748                            log::warn!(
1749                                "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1750                                 the cached PATH probe disagreed with runtime spawn — likely PATH \
1751                                 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1752                                shell.binary(),
1753                                idx
1754                            );
1755                        }
1756                        if flags == without_breakaway {
1757                            log::warn!(
1758                                "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1759                                 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1760                                 Spawned without breakaway; the bg task will be torn down if the \
1761                                 AFT process group is killed."
1762                            );
1763                        }
1764                        return Ok(child);
1765                    }
1766                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1767                        log::warn!(
1768                            "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1769                            shell.binary()
1770                        );
1771                        last_error = Some(format!("{}: {e}", shell.binary()));
1772                        // Skip the without-breakaway retry for NotFound — the
1773                        // binary itself is missing, breakaway flag is irrelevant.
1774                        break;
1775                    }
1776                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1777                        // Access Denied during breakaway — retry without it.
1778                        log::warn!(
1779                            "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1780                             Access Denied — retrying {} without breakaway",
1781                            shell.binary()
1782                        );
1783                        last_error = Some(format!("{}: {e}", shell.binary()));
1784                        continue;
1785                    }
1786                    Err(e) => {
1787                        return Err(format!(
1788                            "failed to spawn background bash command via {}: {e}",
1789                            shell.binary()
1790                        ));
1791                    }
1792                }
1793            }
1794        }
1795        Err(format!(
1796            "failed to spawn background bash command: no Windows shell could be spawned. \
1797             Last error: {}. PATH-probed candidates: {:?}",
1798            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1799            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1800        ))
1801    }
1802}
1803
1804fn random_slug() -> String {
1805    let mut bytes = [0u8; 4];
1806    // getrandom is a transitive dependency; use it directly for OS entropy.
1807    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1808        // Extremely unlikely fallback: time + pid mix.
1809        let t = SystemTime::now()
1810            .duration_since(UNIX_EPOCH)
1811            .map(|d| d.subsec_nanos())
1812            .unwrap_or(0);
1813        let p = std::process::id();
1814        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1815    });
1816    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
1817    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1818    format!("bash-{hex}")
1819}
1820
1821#[cfg(test)]
1822mod tests {
1823    use std::collections::HashMap;
1824    #[cfg(windows)]
1825    use std::fs;
1826    use std::sync::{Arc, Mutex};
1827    use std::time::Duration;
1828    #[cfg(windows)]
1829    use std::time::Instant;
1830
1831    use super::*;
1832
1833    #[cfg(unix)]
1834    const QUICK_SUCCESS_COMMAND: &str = "true";
1835    #[cfg(windows)]
1836    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1837
1838    #[cfg(unix)]
1839    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1840    #[cfg(windows)]
1841    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1842
1843    #[test]
1844    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1845        let registry = BgTaskRegistry::default();
1846        let dir = tempfile::tempdir().unwrap();
1847        let task_id = registry
1848            .spawn(
1849                QUICK_SUCCESS_COMMAND,
1850                "session".to_string(),
1851                dir.path().to_path_buf(),
1852                HashMap::new(),
1853                Some(Duration::from_secs(30)),
1854                dir.path().to_path_buf(),
1855                10,
1856                true,
1857                false,
1858                Some(dir.path().to_path_buf()),
1859            )
1860            .unwrap();
1861        registry
1862            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1863            .unwrap();
1864        let completions = registry.drain_completions_for_session(Some("session"));
1865        assert_eq!(completions.len(), 1);
1866
1867        registry.cleanup_finished(Duration::ZERO);
1868
1869        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1870    }
1871
1872    #[test]
1873    fn cleanup_finished_retains_undelivered_terminals() {
1874        let registry = BgTaskRegistry::default();
1875        let dir = tempfile::tempdir().unwrap();
1876        let task_id = registry
1877            .spawn(
1878                QUICK_SUCCESS_COMMAND,
1879                "session".to_string(),
1880                dir.path().to_path_buf(),
1881                HashMap::new(),
1882                Some(Duration::from_secs(30)),
1883                dir.path().to_path_buf(),
1884                10,
1885                true,
1886                false,
1887                Some(dir.path().to_path_buf()),
1888            )
1889            .unwrap();
1890        registry
1891            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1892            .unwrap();
1893
1894        registry.cleanup_finished(Duration::ZERO);
1895
1896        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1897    }
1898
1899    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1900    /// watchdog path (reap_child) marks a task Failed when the child
1901    /// has exited but no exit marker was written. Before this fix the
1902    /// task would remain `Running` until timeout, even though the
1903    /// process was definitely dead.
1904    ///
1905    /// Cross-platform: uses a quick-exiting command that does NOT go
1906    /// through the wrapper script (we manually clear the exit marker
1907    /// after spawn to simulate the wrapper crashing before write).
1908    #[test]
1909    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1910        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1911        let dir = tempfile::tempdir().unwrap();
1912        let task_id = registry
1913            .spawn(
1914                QUICK_SUCCESS_COMMAND,
1915                "session".to_string(),
1916                dir.path().to_path_buf(),
1917                HashMap::new(),
1918                Some(Duration::from_secs(30)),
1919                dir.path().to_path_buf(),
1920                10,
1921                true,
1922                false,
1923                Some(dir.path().to_path_buf()),
1924            )
1925            .unwrap();
1926
1927        let task = registry.task_for_session(&task_id, "session").unwrap();
1928
1929        // Wait for the child to actually exit and the wrapper to either
1930        // write the marker or fail. Then nuke the marker to simulate
1931        // wrapper crash before write. Poll up to 5s; this is plenty for a
1932        // `true`/`cmd /c exit 0` invocation.
1933        let started = Instant::now();
1934        loop {
1935            let exited = {
1936                let mut state = task.state.lock().unwrap();
1937                if let Some(child) = state.child.as_mut() {
1938                    matches!(child.try_wait(), Ok(Some(_)))
1939                } else {
1940                    true
1941                }
1942            };
1943            if exited {
1944                break;
1945            }
1946            assert!(
1947                started.elapsed() < Duration::from_secs(5),
1948                "child should exit quickly"
1949            );
1950            std::thread::sleep(Duration::from_millis(20));
1951        }
1952
1953        // Wrapper likely wrote the marker by now; remove it to simulate
1954        // a wrapper crash that exited before persisting the exit code.
1955        let _ = std::fs::remove_file(&task.paths.exit);
1956
1957        // Sanity: task is still Running per metadata (replay/poll hasn't
1958        // observed the missing marker yet).
1959        assert!(
1960            task.is_running(),
1961            "precondition: metadata.status == Running"
1962        );
1963        assert!(
1964            !task.paths.exit.exists(),
1965            "precondition: exit marker absent"
1966        );
1967
1968        // Invoke the watchdog's reap_child directly. The fix should mark
1969        // the task Failed with the documented reason string, instead of
1970        // just dropping the child handle and leaving status=Running.
1971        registry.reap_child(&task);
1972
1973        let state = task.state.lock().unwrap();
1974        assert!(
1975            state.metadata.status.is_terminal(),
1976            "reap_child must transition to terminal when PID dead and no marker. \
1977             Got status={:?}",
1978            state.metadata.status
1979        );
1980        assert_eq!(
1981            state.metadata.status,
1982            BgTaskStatus::Failed,
1983            "must specifically be Failed (not Killed): status={:?}",
1984            state.metadata.status
1985        );
1986        assert_eq!(
1987            state.metadata.status_reason.as_deref(),
1988            Some("process exited without exit marker"),
1989            "reason must match replay path's wording: {:?}",
1990            state.metadata.status_reason
1991        );
1992        assert!(
1993            state.child.is_none(),
1994            "child handle must be released after reap"
1995        );
1996        assert!(state.detached, "task must be marked detached after reap");
1997    }
1998
1999    /// Companion to the above: when the exit marker DOES exist on disk
2000    /// at reap_child time (race window — wrapper finished writing
2001    /// between try_wait and the marker check), reap_child must NOT mark
2002    /// the task Failed. Instead it leaves status=Running and lets the
2003    /// next poll_task() cycle finalize via the marker.
2004    #[test]
2005    fn reap_child_preserves_running_when_exit_marker_exists() {
2006        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2007        let dir = tempfile::tempdir().unwrap();
2008        let task_id = registry
2009            .spawn(
2010                QUICK_SUCCESS_COMMAND,
2011                "session".to_string(),
2012                dir.path().to_path_buf(),
2013                HashMap::new(),
2014                Some(Duration::from_secs(30)),
2015                dir.path().to_path_buf(),
2016                10,
2017                true,
2018                false,
2019                Some(dir.path().to_path_buf()),
2020            )
2021            .unwrap();
2022
2023        let task = registry.task_for_session(&task_id, "session").unwrap();
2024
2025        // Wait for child to exit AND for the marker to land. Both happen
2026        // shortly after the wrapper finishes — but we want both observed.
2027        let started = Instant::now();
2028        loop {
2029            let exited = {
2030                let mut state = task.state.lock().unwrap();
2031                if let Some(child) = state.child.as_mut() {
2032                    matches!(child.try_wait(), Ok(Some(_)))
2033                } else {
2034                    true
2035                }
2036            };
2037            if exited && task.paths.exit.exists() {
2038                break;
2039            }
2040            assert!(
2041                started.elapsed() < Duration::from_secs(5),
2042                "child should exit and write marker quickly"
2043            );
2044            std::thread::sleep(Duration::from_millis(20));
2045        }
2046
2047        // reap_child sees: child exited, marker exists. It should:
2048        //  - drop state.child / set state.detached = true
2049        //  - NOT change status (poll_task will finalize via marker next tick)
2050        registry.reap_child(&task);
2051
2052        let state = task.state.lock().unwrap();
2053        assert!(
2054            state.child.is_none(),
2055            "child handle still released even when marker exists"
2056        );
2057        assert!(
2058            state.detached,
2059            "task still marked detached even when marker exists"
2060        );
2061        // Status remains Running because reap_child defers to poll_task
2062        // when a marker exists. It would be wrong for reap to record the
2063        // marker outcome (poll_task does that with proper exit-code
2064        // parsing).
2065        assert_eq!(
2066            state.metadata.status,
2067            BgTaskStatus::Running,
2068            "reap_child must defer to poll_task when marker exists"
2069        );
2070    }
2071
2072    #[test]
2073    fn cleanup_finished_keeps_running_tasks() {
2074        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2075        let dir = tempfile::tempdir().unwrap();
2076        let task_id = registry
2077            .spawn(
2078                LONG_RUNNING_COMMAND,
2079                "session".to_string(),
2080                dir.path().to_path_buf(),
2081                HashMap::new(),
2082                Some(Duration::from_secs(30)),
2083                dir.path().to_path_buf(),
2084                10,
2085                true,
2086                false,
2087                Some(dir.path().to_path_buf()),
2088            )
2089            .unwrap();
2090
2091        registry.cleanup_finished(Duration::ZERO);
2092
2093        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2094        let _ = registry.kill(&task_id, "session");
2095    }
2096
2097    #[cfg(windows)]
2098    fn wait_for_file(path: &Path) -> String {
2099        let started = Instant::now();
2100        loop {
2101            if path.exists() {
2102                return fs::read_to_string(path).expect("read file");
2103            }
2104            assert!(
2105                started.elapsed() < Duration::from_secs(30),
2106                "timed out waiting for {}",
2107                path.display()
2108            );
2109            std::thread::sleep(Duration::from_millis(100));
2110        }
2111    }
2112
2113    #[cfg(windows)]
2114    fn spawn_windows_registry_command(
2115        command: &str,
2116    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2117        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2118        let dir = tempfile::tempdir().unwrap();
2119        let task_id = registry
2120            .spawn(
2121                command,
2122                "session".to_string(),
2123                dir.path().to_path_buf(),
2124                HashMap::new(),
2125                Some(Duration::from_secs(30)),
2126                dir.path().to_path_buf(),
2127                10,
2128                false,
2129                false,
2130                Some(dir.path().to_path_buf()),
2131            )
2132            .unwrap();
2133        (registry, dir, task_id)
2134    }
2135
2136    #[cfg(windows)]
2137    #[test]
2138    fn windows_spawn_writes_exit_marker_for_zero_exit() {
2139        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2140        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2141
2142        let content = wait_for_file(&exit_path);
2143
2144        assert_eq!(content.trim(), "0");
2145    }
2146
2147    #[cfg(windows)]
2148    #[test]
2149    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2150        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2151        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2152
2153        let content = wait_for_file(&exit_path);
2154
2155        assert_eq!(content.trim(), "42");
2156    }
2157
2158    #[cfg(windows)]
2159    #[test]
2160    fn windows_spawn_captures_stdout_to_disk() {
2161        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2162        let task = registry.task_for_session(&task_id, "session").unwrap();
2163        let stdout_path = task.paths.stdout.clone();
2164        let exit_path = task.paths.exit.clone();
2165
2166        let _ = wait_for_file(&exit_path);
2167        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2168
2169        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2170    }
2171
2172    #[cfg(windows)]
2173    #[test]
2174    fn windows_spawn_uses_pwsh_when_available() {
2175        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
2176        // (We intentionally pass None for shell_env to keep this test
2177        // independent of the runner's actual env.)
2178        let candidates = crate::windows_shell::shell_candidates_with(
2179            |binary| match binary {
2180                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2181                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2182                _ => None,
2183            },
2184            || None,
2185        );
2186        let shell = candidates.first().expect("at least one candidate").clone();
2187        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2188        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2189    }
2190
2191    /// Issue #27 Oracle review P1: cmd wrapper MUST use `!ERRORLEVEL!` (not
2192    /// `%ERRORLEVEL%`) to capture the user command's run-time exit code.
2193    /// `%VAR%` is parse-time-expanded by cmd, so a wrapper using
2194    /// `%ERRORLEVEL%` would record a stale value (typically 0 from cmd's
2195    /// startup) regardless of what the user command returned. `!VAR!`
2196    /// requires delayed expansion, which `WindowsShell::bg_command` enables
2197    /// via `/V:ON`.
2198    #[cfg(windows)]
2199    #[test]
2200    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2201        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2202        let script =
2203            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2204
2205        // MUST use !ERRORLEVEL! (delayed expansion), NOT %ERRORLEVEL%
2206        // (parse-time expansion). The latter would record a stale exit
2207        // code regardless of what the user command actually returned.
2208        assert!(
2209            script.contains("& echo !ERRORLEVEL! >"),
2210            "wrapper must use delayed expansion: {script}"
2211        );
2212        assert!(
2213            !script.contains("%ERRORLEVEL%"),
2214            "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
2215        );
2216        assert!(script.contains("& move /Y"));
2217        // move output should be redirected to nul to avoid polluting the
2218        // user's captured stdout with "1 file(s) moved." lines.
2219        assert!(
2220            script.contains("> nul"),
2221            "wrapper must redirect move output to nul: {script}"
2222        );
2223        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
2224        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
2225    }
2226
2227    /// Issue #27 Oracle review P1: `bg_command()` for Cmd MUST prepend
2228    /// `/V:ON` to enable delayed expansion AND `/S` to use simple-quote
2229    /// parsing (so cmd's /C parser doesn't mangle the wrapper's internal
2230    /// quotes from `cmd_quote`).
2231    #[cfg(windows)]
2232    #[test]
2233    fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
2234        use crate::windows_shell::WindowsShell;
2235        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
2236        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2237        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2238        assert_eq!(
2239            args_strs,
2240            vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
2241            "Cmd::bg_command must prepend /V:ON /D /S /C"
2242        );
2243    }
2244
2245    /// PowerShell variants don't need `/V:ON`-style flags; their args
2246    /// are the same for foreground (`command()`) and background
2247    /// (`bg_command()`).
2248    #[cfg(windows)]
2249    #[test]
2250    fn windows_shell_pwsh_bg_command_uses_standard_args() {
2251        use crate::windows_shell::WindowsShell;
2252        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
2253        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2254        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2255        assert!(
2256            args_strs.contains(&"-Command"),
2257            "Pwsh::bg_command must use -Command: {args_strs:?}"
2258        );
2259        assert!(
2260            args_strs.contains(&"Get-Date"),
2261            "Pwsh::bg_command must include the user command body"
2262        );
2263    }
2264
2265    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
2266    /// **cmd.exe-specific** wrapper path captures the user command's
2267    /// run-time exit code correctly. The existing
2268    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
2269    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
2270    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
2271    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
2272    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
2273    ///
2274    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
2275    /// force the cmd-wrapper code path, then writes the exit marker and
2276    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
2277    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
2278    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
2279    /// regardless of what the user command returned.
2280    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
2281    /// the inline command-line wrapper helper that production code does
2282    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
2283    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
2284    /// produced silent failures on Windows 11 (move /Y could not parse
2285    /// path arguments through cmd's /C parser). The `bg_command` helper
2286    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
2287    /// the production spawn path goes through `detached_shell_command_for`
2288    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
2289    /// <bat-path>`.
2290    ///
2291    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
2292    /// covered live by the Windows e2e harness scenario 2d
2293    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
2294    /// exercises the real file-based wrapper end-to-end via the protocol.
2295    #[allow(dead_code)]
2296    #[cfg(any())] // disabled on all targets
2297    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
2298}