Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use serde::Serialize;
12
13use crate::context::SharedProgressSender;
14use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
15
16#[cfg(unix)]
17use std::os::unix::process::CommandExt;
18#[cfg(windows)]
19use std::os::windows::process::CommandExt;
20
21use super::buffer::BgBuffer;
22use super::persistence::{
23    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
24    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
25    PersistedTask, TaskPaths,
26};
27use super::process::is_process_alive;
28#[cfg(unix)]
29use super::process::terminate_pgid;
30#[cfg(windows)]
31use super::process::terminate_pid;
32use super::{BgTaskInfo, BgTaskStatus};
33// Note: `resolve_windows_shell` is no longer imported at module scope —
34// production code in `spawn_detached_child` uses `shell_candidates()`
35// with retry instead, and the function remains in `windows_shell.rs`
36// for tests and as a future helper.
37
38/// Default timeout for background bash tasks: 30 minutes.
39/// Agents can override per-call via the `timeout` parameter (in ms).
40const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
41const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
42const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
43const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
44
45/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
46/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
47/// of typical command output (git status, test results, exit messages) — short
48/// enough that round-tripping multiple completions in one reminder stays well
49/// under the model's context budget but long enough that most successful runs
50/// don't need a follow-up `bash_status` call.
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
52
53#[derive(Debug, Clone, Serialize)]
54pub struct BgCompletion {
55    pub task_id: String,
56    /// Intentionally omitted from serialized completion payloads: push frames
57    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
58    #[serde(skip_serializing)]
59    pub session_id: String,
60    pub status: BgTaskStatus,
61    pub exit_code: Option<i32>,
62    pub command: String,
63    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
64    /// cached so push-frame consumers and `bash_drain_completions` callers see
65    /// the same preview without racing against later output rotation. Empty
66    /// when not captured (e.g., persisted task seen on startup before buffer
67    /// reattachment).
68    #[serde(default, skip_serializing_if = "String::is_empty")]
69    pub output_preview: String,
70    /// True when the captured tail is shorter than the actual output (because
71    /// rotation occurred or the output exceeds the preview cap). Plugins use
72    /// this to render a `…` prefix and signal that `bash_status` would return
73    /// more.
74    #[serde(default, skip_serializing_if = "is_false")]
75    pub output_truncated: bool,
76}
77
78fn is_false(v: &bool) -> bool {
79    !*v
80}
81
82#[derive(Debug, Clone, Serialize)]
83pub struct BgTaskSnapshot {
84    #[serde(flatten)]
85    pub info: BgTaskInfo,
86    pub exit_code: Option<i32>,
87    pub child_pid: Option<u32>,
88    pub workdir: String,
89    pub output_preview: String,
90    pub output_truncated: bool,
91    pub output_path: Option<String>,
92    pub stderr_path: Option<String>,
93}
94
95#[derive(Clone)]
96pub struct BgTaskRegistry {
97    pub(crate) inner: Arc<RegistryInner>,
98}
99
100pub(crate) struct RegistryInner {
101    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
102    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
103    pub(crate) progress_sender: SharedProgressSender,
104    watchdog_started: AtomicBool,
105    pub(crate) shutdown: AtomicBool,
106    pub(crate) long_running_reminder_enabled: AtomicBool,
107    pub(crate) long_running_reminder_interval_ms: AtomicU64,
108    persisted_gc_started: AtomicBool,
109    #[cfg(test)]
110    persisted_gc_runs: AtomicU64,
111    /// Output compression callback. Set by `AppContext` after construction.
112    /// Takes (command, raw_output) and returns compressed text. Called from
113    /// the watchdog thread when a task reaches a terminal state and from
114    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
115    /// uncompressed.
116    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
117}
118
119pub(crate) struct BgTask {
120    pub(crate) task_id: String,
121    pub(crate) session_id: String,
122    pub(crate) paths: TaskPaths,
123    pub(crate) started: Instant,
124    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
125    pub(crate) terminal_at: Mutex<Option<Instant>>,
126    pub(crate) state: Mutex<BgTaskState>,
127}
128
129pub(crate) struct BgTaskState {
130    pub(crate) metadata: PersistedTask,
131    pub(crate) child: Option<Child>,
132    pub(crate) detached: bool,
133    pub(crate) buffer: BgBuffer,
134}
135
136impl BgTaskRegistry {
137    pub fn new(progress_sender: SharedProgressSender) -> Self {
138        Self {
139            inner: Arc::new(RegistryInner {
140                tasks: Mutex::new(HashMap::new()),
141                completions: Mutex::new(VecDeque::new()),
142                progress_sender,
143                watchdog_started: AtomicBool::new(false),
144                shutdown: AtomicBool::new(false),
145                long_running_reminder_enabled: AtomicBool::new(true),
146                long_running_reminder_interval_ms: AtomicU64::new(600_000),
147                persisted_gc_started: AtomicBool::new(false),
148                #[cfg(test)]
149                persisted_gc_runs: AtomicU64::new(0),
150                compressor: Mutex::new(None),
151            }),
152        }
153    }
154
155    /// Install the output-compression callback. Called by `main.rs` after
156    /// `AppContext` is constructed so that snapshot/completion paths can
157    /// invoke `compress::compress_with_registry` without holding a context
158    /// reference. When called multiple times, the latest installation wins.
159    pub fn set_compressor<F>(&self, compressor: F)
160    where
161        F: Fn(&str, String) -> String + Send + Sync + 'static,
162    {
163        if let Ok(mut slot) = self.inner.compressor.lock() {
164            *slot = Some(Box::new(compressor));
165        }
166    }
167
168    /// Apply the installed compressor (if any) to `output`. Returns `output`
169    /// untouched when no compressor is installed.
170    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
171        let Ok(slot) = self.inner.compressor.lock() else {
172            return output;
173        };
174        match slot.as_ref() {
175            Some(compressor) => compressor(command, output),
176            None => output,
177        }
178    }
179
180    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
181        self.inner
182            .long_running_reminder_enabled
183            .store(enabled, Ordering::SeqCst);
184        self.inner
185            .long_running_reminder_interval_ms
186            .store(interval_ms, Ordering::SeqCst);
187    }
188
189    #[cfg(unix)]
190    #[allow(clippy::too_many_arguments)]
191    pub fn spawn(
192        &self,
193        command: &str,
194        session_id: String,
195        workdir: PathBuf,
196        env: HashMap<String, String>,
197        timeout: Option<Duration>,
198        storage_dir: PathBuf,
199        max_running: usize,
200        notify_on_completion: bool,
201        compressed: bool,
202        project_root: Option<PathBuf>,
203    ) -> Result<String, String> {
204        self.start_watchdog();
205
206        let running = self.running_count();
207        if running >= max_running {
208            return Err(format!(
209                "background bash task limit exceeded: {running} running (max {max_running})"
210            ));
211        }
212
213        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
214        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
215        let task_id = self.generate_unique_task_id()?;
216        let paths = task_paths(&storage_dir, &session_id, &task_id);
217        fs::create_dir_all(&paths.dir)
218            .map_err(|e| format!("failed to create background task dir: {e}"))?;
219
220        let mut metadata = PersistedTask::starting(
221            task_id.clone(),
222            session_id.clone(),
223            command.to_string(),
224            workdir.clone(),
225            project_root,
226            timeout_ms,
227            notify_on_completion,
228            compressed,
229        );
230        write_task(&paths.json, &metadata)
231            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
232
233        // Pre-create capture files so the watchdog/buffer can always
234        // open them for reading. The spawn helper opens its own handles
235        // per attempt because each `Command::spawn()` consumes them.
236        create_capture_file(&paths.stdout)
237            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
238        create_capture_file(&paths.stderr)
239            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
240
241        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
242            Ok(child) => child,
243            Err(error) => {
244                log::warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
245                let _ = delete_task_bundle(&paths);
246                return Err(error);
247            }
248        };
249
250        let child_pid = child.id();
251        metadata.mark_running(child_pid, child_pid as i32);
252        write_task(&paths.json, &metadata)
253            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
254
255        let task = Arc::new(BgTask {
256            task_id: task_id.clone(),
257            session_id,
258            paths: paths.clone(),
259            started: Instant::now(),
260            last_reminder_at: Mutex::new(None),
261            terminal_at: Mutex::new(None),
262            state: Mutex::new(BgTaskState {
263                metadata,
264                child: Some(child),
265                detached: false,
266                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
267            }),
268        });
269
270        self.inner
271            .tasks
272            .lock()
273            .map_err(|_| "background task registry lock poisoned".to_string())?
274            .insert(task_id.clone(), task);
275
276        Ok(task_id)
277    }
278
279    #[cfg(windows)]
280    #[allow(clippy::too_many_arguments)]
281    pub fn spawn(
282        &self,
283        command: &str,
284        session_id: String,
285        workdir: PathBuf,
286        env: HashMap<String, String>,
287        timeout: Option<Duration>,
288        storage_dir: PathBuf,
289        max_running: usize,
290        notify_on_completion: bool,
291        compressed: bool,
292        project_root: Option<PathBuf>,
293    ) -> Result<String, String> {
294        self.start_watchdog();
295
296        let running = self.running_count();
297        if running >= max_running {
298            return Err(format!(
299                "background bash task limit exceeded: {running} running (max {max_running})"
300            ));
301        }
302
303        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
304        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
305        let task_id = self.generate_unique_task_id()?;
306        let paths = task_paths(&storage_dir, &session_id, &task_id);
307        fs::create_dir_all(&paths.dir)
308            .map_err(|e| format!("failed to create background task dir: {e}"))?;
309
310        let mut metadata = PersistedTask::starting(
311            task_id.clone(),
312            session_id.clone(),
313            command.to_string(),
314            workdir.clone(),
315            project_root,
316            timeout_ms,
317            notify_on_completion,
318            compressed,
319        );
320        write_task(&paths.json, &metadata)
321            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
322
323        // Capture files are pre-created so the watchdog/buffer can always
324        // open them for reading even if the child hasn't written anything
325        // yet. The spawn helper opens its own handles per attempt because
326        // each `Command::spawn()` consumes them, and on Windows we may
327        // retry across multiple shell candidates if the first one fails.
328        create_capture_file(&paths.stdout)
329            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
330        create_capture_file(&paths.stderr)
331            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
332
333        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
334            Ok(child) => child,
335            Err(error) => {
336                log::warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
337                let _ = delete_task_bundle(&paths);
338                return Err(error);
339            }
340        };
341
342        let child_pid = child.id();
343        metadata.status = BgTaskStatus::Running;
344        metadata.child_pid = Some(child_pid);
345        metadata.pgid = None;
346        write_task(&paths.json, &metadata)
347            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
348
349        let task = Arc::new(BgTask {
350            task_id: task_id.clone(),
351            session_id,
352            paths: paths.clone(),
353            started: Instant::now(),
354            last_reminder_at: Mutex::new(None),
355            terminal_at: Mutex::new(None),
356            state: Mutex::new(BgTaskState {
357                metadata,
358                child: Some(child),
359                detached: false,
360                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
361            }),
362        });
363
364        self.inner
365            .tasks
366            .lock()
367            .map_err(|_| "background task registry lock poisoned".to_string())?
368            .insert(task_id.clone(), task);
369
370        Ok(task_id)
371    }
372
373    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
374        self.start_watchdog();
375        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
376            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
377                log::warn!("failed to GC persisted background bash tasks: {error}");
378            }
379        }
380        let dir = session_tasks_dir(storage_dir, session_id);
381        if !dir.exists() {
382            return Ok(());
383        }
384
385        let entries = fs::read_dir(&dir)
386            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
387        for entry in entries.flatten() {
388            let path = entry.path();
389            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
390                continue;
391            }
392            let Ok(mut metadata) = read_task(&path) else {
393                continue;
394            };
395            if metadata.session_id != session_id {
396                continue;
397            }
398
399            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
400            match metadata.status {
401                BgTaskStatus::Starting => {
402                    metadata.mark_terminal(
403                        BgTaskStatus::Failed,
404                        None,
405                        Some("spawn aborted".to_string()),
406                    );
407                    let _ = write_task(&paths.json, &metadata);
408                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
409                }
410                BgTaskStatus::Running | BgTaskStatus::Killing => {
411                    if self.running_metadata_is_stale(&metadata) {
412                        metadata.mark_terminal(
413                            BgTaskStatus::Killed,
414                            None,
415                            Some("orphaned (>24h)".to_string()),
416                        );
417                        if !paths.exit.exists() {
418                            let _ = write_kill_marker_if_absent(&paths.exit);
419                        }
420                        let _ = write_task(&paths.json, &metadata);
421                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
422                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
423                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
424                            "recovered from inconsistent killing state on replay".to_string()
425                        });
426                        if reason.is_some() {
427                            log::warn!(
428                                "background task {} had killing state with exit marker; preferring marker",
429                                metadata.task_id
430                            );
431                        }
432                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
433                        let _ = write_task(&paths.json, &metadata);
434                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
435                    } else if metadata.status == BgTaskStatus::Killing {
436                        if !paths.exit.exists() {
437                            let _ = write_kill_marker_if_absent(&paths.exit);
438                        }
439                        metadata.mark_terminal(
440                            BgTaskStatus::Killed,
441                            None,
442                            Some("recovered from inconsistent killing state on replay".to_string()),
443                        );
444                        let _ = write_task(&paths.json, &metadata);
445                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
446                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
447                        metadata.mark_terminal(
448                            BgTaskStatus::Failed,
449                            None,
450                            Some("process exited without exit marker".to_string()),
451                        );
452                        let _ = write_task(&paths.json, &metadata);
453                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
454                    } else {
455                        self.insert_rehydrated_task(metadata, paths, true)?;
456                    }
457                }
458                _ if metadata.status.is_terminal() => {
459                    // Borrow `paths` for the completion enqueue BEFORE
460                    // `insert_rehydrated_task` consumes it. The completion
461                    // helper only reads from `paths` (stdout/stderr/exit) to
462                    // reconstruct a tail preview, so it must see the same
463                    // paths the rehydrated task will own.
464                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
465                    self.insert_rehydrated_task(metadata, paths, true)?;
466                }
467                _ => {}
468            }
469        }
470
471        Ok(())
472    }
473
474    pub fn status(
475        &self,
476        task_id: &str,
477        session_id: &str,
478        project_root: Option<&Path>,
479        storage_dir: Option<&Path>,
480        preview_bytes: usize,
481    ) -> Option<BgTaskSnapshot> {
482        let mut task = self.task_for_session(task_id, session_id);
483        if task.is_none() {
484            if let Some(storage_dir) = storage_dir {
485                let _ = self.replay_session(storage_dir, session_id);
486                task = self.task_for_session(task_id, session_id);
487            }
488        }
489        let Some(task) = task else {
490            return self.status_relaxed(
491                task_id,
492                session_id,
493                project_root?,
494                storage_dir?,
495                preview_bytes,
496            );
497        };
498        let _ = self.poll_task(&task);
499        let mut snapshot = task.snapshot(preview_bytes);
500        self.maybe_compress_snapshot(&task, &mut snapshot);
501        Some(snapshot)
502    }
503
504    fn status_relaxed_task(
505        &self,
506        task_id: &str,
507        project_root: &Path,
508        storage_dir: &Path,
509    ) -> Option<Arc<BgTask>> {
510        let canonical_project = canonicalized_path(project_root);
511        let root = storage_dir.join("bash-tasks");
512        let entries = fs::read_dir(&root).ok()?;
513        for entry in entries.flatten() {
514            let dir = entry.path();
515            if !dir.is_dir() {
516                continue;
517            }
518            let path = dir.join(format!("{task_id}.json"));
519            if !path.exists() {
520                continue;
521            }
522            let Ok(metadata) = read_task(&path) else {
523                continue;
524            };
525            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
526            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
527                continue;
528            }
529            if let Some(task) = self.task(task_id) {
530                let matches_project = task
531                    .state
532                    .lock()
533                    .map(|state| {
534                        state
535                            .metadata
536                            .project_root
537                            .as_deref()
538                            .map(canonicalized_path)
539                            .as_deref()
540                            == Some(canonical_project.as_path())
541                    })
542                    .unwrap_or(false);
543                return matches_project.then_some(task);
544            }
545            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
546            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
547                return None;
548            }
549            return self.task(task_id);
550        }
551        None
552    }
553
554    pub(super) fn status_relaxed(
555        &self,
556        task_id: &str,
557        _session_id: &str,
558        project_root: &Path,
559        storage_dir: &Path,
560        preview_bytes: usize,
561    ) -> Option<BgTaskSnapshot> {
562        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
563        let _ = self.poll_task(&task);
564        let mut snapshot = task.snapshot(preview_bytes);
565        self.maybe_compress_snapshot(&task, &mut snapshot);
566        Some(snapshot)
567    }
568
569    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
570        #[cfg(test)]
571        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
572
573        let mut deleted = 0usize;
574
575        let root = storage_dir.join("bash-tasks");
576        if root.exists() {
577            let session_dirs = fs::read_dir(&root).map_err(|e| {
578                format!(
579                    "failed to read background task root {}: {e}",
580                    root.display()
581                )
582            })?;
583            for session_entry in session_dirs.flatten() {
584                let session_dir = session_entry.path();
585                if !session_dir.is_dir() {
586                    continue;
587                }
588                let task_entries = match fs::read_dir(&session_dir) {
589                    Ok(entries) => entries,
590                    Err(error) => {
591                        log::warn!(
592                            "failed to read background task session dir {}: {error}",
593                            session_dir.display()
594                        );
595                        continue;
596                    }
597                };
598                for task_entry in task_entries.flatten() {
599                    let json_path = task_entry.path();
600                    if json_path
601                        .extension()
602                        .and_then(|extension| extension.to_str())
603                        != Some("json")
604                    {
605                        continue;
606                    }
607                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
608                        continue;
609                    }
610                    let metadata = match read_task(&json_path) {
611                        Ok(metadata) => metadata,
612                        Err(error) => {
613                            log::warn!(
614                                "quarantining corrupt background task metadata {}: {error}",
615                                json_path.display()
616                            );
617                            quarantine_corrupt_task_json(storage_dir, &session_dir, &json_path)?;
618                            continue;
619                        }
620                    };
621                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
622                        continue;
623                    }
624                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
625                    match delete_task_bundle(&paths) {
626                        Ok(()) => {
627                            deleted += 1;
628                            log::debug!(
629                                "deleted persisted background task bundle {}",
630                                metadata.task_id
631                            );
632                        }
633                        Err(error) => {
634                            log::warn!(
635                                "failed to delete background task bundle {}: {error}",
636                                metadata.task_id
637                            );
638                            continue;
639                        }
640                    }
641                }
642            }
643        }
644        gc_quarantine(storage_dir);
645        Ok(deleted)
646    }
647
648    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
649        let tasks = self
650            .inner
651            .tasks
652            .lock()
653            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
654            .unwrap_or_default();
655        tasks
656            .into_iter()
657            .map(|task| {
658                let _ = self.poll_task(&task);
659                let mut snapshot = task.snapshot(preview_bytes);
660                self.maybe_compress_snapshot(&task, &mut snapshot);
661                snapshot
662            })
663            .collect()
664    }
665
666    /// Compress `output_preview` in place when the task is in a terminal
667    /// state. Live tail of running tasks stays raw so agents debugging
668    /// long-running bash see exactly what the process emitted, not a
669    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
670    /// field on `PersistedTask` short-circuits before the compress pipeline.
671    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
672        if !snapshot.info.status.is_terminal() {
673            return;
674        }
675        let compressed_flag = task
676            .state
677            .lock()
678            .map(|state| state.metadata.compressed)
679            .unwrap_or(true);
680        if !compressed_flag {
681            return;
682        }
683        let raw = std::mem::take(&mut snapshot.output_preview);
684        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
685    }
686
687    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
688        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
689    }
690
691    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
692        let task = self
693            .task_for_session(task_id, session_id)
694            .ok_or_else(|| format!("background task not found: {task_id}"))?;
695        let mut state = task
696            .state
697            .lock()
698            .map_err(|_| "background task lock poisoned".to_string())?;
699        let updated = update_task(&task.paths.json, |metadata| {
700            metadata.notify_on_completion = true;
701            metadata.completion_delivered = false;
702        })
703        .map_err(|e| format!("failed to promote background task: {e}"))?;
704        state.metadata = updated;
705        if state.metadata.status.is_terminal() {
706            state.buffer.enforce_terminal_cap();
707            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
708        }
709        Ok(true)
710    }
711
712    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
713        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
714            .map(|_| ())
715    }
716
717    pub fn cleanup_finished(&self, older_than: Duration) {
718        let cutoff = Instant::now().checked_sub(older_than);
719        let removable_paths: Vec<(String, TaskPaths)> =
720            if let Ok(mut tasks) = self.inner.tasks.lock() {
721                let removable = tasks
722                    .iter()
723                    .filter_map(|(task_id, task)| {
724                        let delivered_terminal = task
725                            .state
726                            .lock()
727                            .map(|state| {
728                                state.metadata.status.is_terminal()
729                                    && state.metadata.completion_delivered
730                            })
731                            .unwrap_or(false);
732                        if !delivered_terminal {
733                            return None;
734                        }
735
736                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
737                        let expired = match (terminal_at, cutoff) {
738                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
739                            (Some(_), None) => true,
740                            (None, _) => false,
741                        };
742                        expired.then(|| task_id.clone())
743                    })
744                    .collect::<Vec<_>>();
745
746                removable
747                    .into_iter()
748                    .filter_map(|task_id| {
749                        tasks
750                            .remove(&task_id)
751                            .map(|task| (task_id, task.paths.clone()))
752                    })
753                    .collect()
754            } else {
755                Vec::new()
756            };
757
758        for (task_id, paths) in removable_paths {
759            match delete_task_bundle(&paths) {
760                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
761                Err(error) => log::warn!(
762                    "failed to delete persisted background task bundle {task_id}: {error}"
763                ),
764            }
765        }
766    }
767
768    pub fn drain_completions(&self) -> Vec<BgCompletion> {
769        self.drain_completions_for_session(None)
770    }
771
772    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
773        let mut completions = match self.inner.completions.lock() {
774            Ok(completions) => completions,
775            Err(_) => return Vec::new(),
776        };
777
778        let drained = if let Some(session_id) = session_id {
779            let mut matched = Vec::new();
780            let mut retained = VecDeque::new();
781            while let Some(completion) = completions.pop_front() {
782                if completion.session_id == session_id {
783                    matched.push(completion);
784                } else {
785                    retained.push_back(completion);
786                }
787            }
788            *completions = retained;
789            matched
790        } else {
791            completions.drain(..).collect()
792        };
793        drop(completions);
794
795        for completion in &drained {
796            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
797                let _ = task.set_completion_delivered(true);
798            }
799        }
800
801        drained
802    }
803
804    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
805        self.inner
806            .completions
807            .lock()
808            .map(|completions| {
809                completions
810                    .iter()
811                    .filter(|completion| completion.session_id == session_id)
812                    .cloned()
813                    .collect()
814            })
815            .unwrap_or_default()
816    }
817
818    pub fn detach(&self) {
819        self.inner.shutdown.store(true, Ordering::SeqCst);
820        if let Ok(mut tasks) = self.inner.tasks.lock() {
821            for task in tasks.values() {
822                if let Ok(mut state) = task.state.lock() {
823                    state.child = None;
824                    state.detached = true;
825                }
826            }
827            tasks.clear();
828        }
829    }
830
831    pub fn shutdown(&self) {
832        let tasks = self
833            .inner
834            .tasks
835            .lock()
836            .map(|tasks| {
837                tasks
838                    .values()
839                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
840                    .collect::<Vec<_>>()
841            })
842            .unwrap_or_default();
843        for (task_id, session_id) in tasks {
844            let _ = self.kill(&task_id, &session_id);
845        }
846    }
847
848    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
849        let marker = match read_exit_marker(&task.paths.exit) {
850            Ok(Some(marker)) => marker,
851            Ok(None) => return Ok(()),
852            Err(error) => return Err(format!("failed to read exit marker: {error}")),
853        };
854        self.finalize_from_marker(task, marker, None)
855    }
856
857    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
858        let Ok(mut state) = task.state.lock() else {
859            return;
860        };
861        if let Some(child) = state.child.as_mut() {
862            if matches!(child.try_wait(), Ok(Some(_))) {
863                // Child has exited. If the wrapper successfully wrote an
864                // exit marker, the next `poll_task()` cycle will pick it up
865                // and finalize via `finalize_from_marker`. But if the
866                // wrapper crashed before writing the marker (e.g. SIGKILL,
867                // power loss, wrapper bug), the task would forever appear
868                // Running until `timeout_ms` expired — and if no timeout
869                // was set, until the 24h `running_metadata_is_stale` cutoff
870                // hit at the next aft restart. Same condition as the replay
871                // path's "PID dead but no marker" branch (see line 338).
872                //
873                // To avoid that hidden hang, mark the task Failed
874                // immediately with the same reason string used by replay,
875                // but only if the marker is genuinely absent. If a marker
876                // appeared on disk between try_wait() returning and now
877                // (race window), prefer the marker — let the next poll
878                // cycle finalize from it.
879                state.child = None;
880                state.detached = true;
881                if state.metadata.status.is_terminal() {
882                    return;
883                }
884                if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
885                    return;
886                }
887                let updated = update_task(&task.paths.json, |metadata| {
888                    metadata.mark_terminal(
889                        BgTaskStatus::Failed,
890                        None,
891                        Some("process exited without exit marker".to_string()),
892                    );
893                });
894                if let Ok(metadata) = updated {
895                    state.metadata = metadata;
896                    task.mark_terminal_now();
897                    state.buffer.enforce_terminal_cap();
898                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
899                }
900            }
901        }
902    }
903
904    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
905        self.inner
906            .tasks
907            .lock()
908            .map(|tasks| {
909                tasks
910                    .values()
911                    .filter(|task| task.is_running())
912                    .cloned()
913                    .collect()
914            })
915            .unwrap_or_default()
916    }
917
918    fn insert_rehydrated_task(
919        &self,
920        metadata: PersistedTask,
921        paths: TaskPaths,
922        detached: bool,
923    ) -> Result<(), String> {
924        let task_id = metadata.task_id.clone();
925        let session_id = metadata.session_id.clone();
926        let started = started_instant_from_unix_millis(metadata.started_at);
927        let task = Arc::new(BgTask {
928            task_id: task_id.clone(),
929            session_id,
930            paths: paths.clone(),
931            started,
932            last_reminder_at: Mutex::new(None),
933            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
934            state: Mutex::new(BgTaskState {
935                metadata,
936                child: None,
937                detached,
938                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
939            }),
940        });
941        self.inner
942            .tasks
943            .lock()
944            .map_err(|_| "background task registry lock poisoned".to_string())?
945            .insert(task_id, task);
946        Ok(())
947    }
948
949    fn kill_with_status(
950        &self,
951        task_id: &str,
952        session_id: &str,
953        terminal_status: BgTaskStatus,
954    ) -> Result<BgTaskSnapshot, String> {
955        let task = self
956            .task_for_session(task_id, session_id)
957            .ok_or_else(|| format!("background task not found: {task_id}"))?;
958
959        {
960            let mut state = task
961                .state
962                .lock()
963                .map_err(|_| "background task lock poisoned".to_string())?;
964            if state.metadata.status.is_terminal() {
965                return Ok(task.snapshot_locked(&state, 5 * 1024));
966            }
967
968            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
969                state.metadata =
970                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
971                task.mark_terminal_now();
972                state.child = None;
973                state.detached = true;
974                state.buffer.enforce_terminal_cap();
975                write_task(&task.paths.json, &state.metadata)
976                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
977                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
978                return Ok(task.snapshot_locked(&state, 5 * 1024));
979            }
980
981            state.metadata.status = BgTaskStatus::Killing;
982            write_task(&task.paths.json, &state.metadata)
983                .map_err(|e| format!("failed to persist killing state: {e}"))?;
984
985            #[cfg(unix)]
986            if let Some(pgid) = state.metadata.pgid {
987                terminate_pgid(pgid, state.child.as_mut());
988            }
989            #[cfg(windows)]
990            if let Some(child) = state.child.as_mut() {
991                super::process::terminate_process(child);
992            } else if let Some(pid) = state.metadata.child_pid {
993                terminate_pid(pid);
994            }
995            if let Some(child) = state.child.as_mut() {
996                let _ = child.wait();
997            }
998            state.child = None;
999            state.detached = true;
1000
1001            if !task.paths.exit.exists() {
1002                write_kill_marker_if_absent(&task.paths.exit)
1003                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
1004            }
1005
1006            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1007                Some(124)
1008            } else {
1009                None
1010            };
1011            state
1012                .metadata
1013                .mark_terminal(terminal_status, exit_code, None);
1014            task.mark_terminal_now();
1015            write_task(&task.paths.json, &state.metadata)
1016                .map_err(|e| format!("failed to persist killed state: {e}"))?;
1017            state.buffer.enforce_terminal_cap();
1018            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1019        }
1020
1021        Ok(task.snapshot(5 * 1024))
1022    }
1023
1024    fn finalize_from_marker(
1025        &self,
1026        task: &Arc<BgTask>,
1027        marker: ExitMarker,
1028        reason: Option<String>,
1029    ) -> Result<(), String> {
1030        let mut state = task
1031            .state
1032            .lock()
1033            .map_err(|_| "background task lock poisoned".to_string())?;
1034        if state.metadata.status.is_terminal() {
1035            return Ok(());
1036        }
1037
1038        let updated = update_task(&task.paths.json, |metadata| {
1039            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1040            *metadata = new_metadata;
1041        })
1042        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1043        state.metadata = updated;
1044        task.mark_terminal_now();
1045        state.child = None;
1046        state.detached = true;
1047        state.buffer.enforce_terminal_cap();
1048        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1049        Ok(())
1050    }
1051
1052    fn enqueue_completion_if_needed(
1053        &self,
1054        metadata: &PersistedTask,
1055        paths: Option<&TaskPaths>,
1056        emit_frame: bool,
1057    ) {
1058        if metadata.status.is_terminal() && !metadata.completion_delivered {
1059            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1060        }
1061    }
1062
1063    fn enqueue_completion_locked(
1064        &self,
1065        metadata: &PersistedTask,
1066        buffer: Option<&BgBuffer>,
1067        emit_frame: bool,
1068    ) {
1069        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1070    }
1071
1072    fn enqueue_completion_from_parts(
1073        &self,
1074        metadata: &PersistedTask,
1075        buffer: Option<&BgBuffer>,
1076        paths: Option<&TaskPaths>,
1077        emit_frame: bool,
1078    ) {
1079        if !metadata.status.is_terminal() || metadata.completion_delivered {
1080            return;
1081        }
1082        // Read tail once at completion time and cache on the BgCompletion so
1083        // both the push-frame consumer (running session) and any later
1084        // `bash_drain_completions` poll (different session, restart) see the
1085        // same preview without racing against rotation.
1086        let (raw_preview, output_truncated) = match buffer {
1087            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1088            None => paths
1089                .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1090                .unwrap_or_else(|| (String::new(), false)),
1091        };
1092        // Compress at completion time so push-frame consumers and later
1093        // `bash_drain_completions` poll-callers see the same compressed text.
1094        // Per-task `compressed: false` opts out; otherwise the compressor is
1095        // a no-op when `experimental.bash.compress=false`.
1096        let output_preview = if metadata.compressed {
1097            self.compress_output(&metadata.command, raw_preview)
1098        } else {
1099            raw_preview
1100        };
1101        let completion = BgCompletion {
1102            task_id: metadata.task_id.clone(),
1103            session_id: metadata.session_id.clone(),
1104            status: metadata.status.clone(),
1105            exit_code: metadata.exit_code,
1106            command: metadata.command.clone(),
1107            output_preview,
1108            output_truncated,
1109        };
1110        if let Ok(mut completions) = self.inner.completions.lock() {
1111            if completions
1112                .iter()
1113                .any(|completion| completion.task_id == metadata.task_id)
1114            {
1115                return;
1116            }
1117            completions.push_back(completion.clone());
1118        } else {
1119            return;
1120        }
1121
1122        if emit_frame {
1123            self.emit_bash_completed(completion);
1124        }
1125    }
1126
1127    fn emit_bash_completed(&self, completion: BgCompletion) {
1128        let Ok(progress_sender) = self
1129            .inner
1130            .progress_sender
1131            .lock()
1132            .map(|sender| sender.clone())
1133        else {
1134            return;
1135        };
1136        let Some(sender) = progress_sender.as_ref() else {
1137            return;
1138        };
1139        // Clone the callback out of the registry mutex before writing to stdout;
1140        // otherwise a blocked push-frame write could pin the mutex and starve
1141        // unrelated progress-sender updates.
1142        // Bg task transitions are discovered by the watchdog thread, so the
1143        // sender is shared behind a Mutex. It still uses the same stdout writer
1144        // closure as foreground progress frames, preserving the existing lock/
1145        // flush behavior in main.rs.
1146        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1147            completion.task_id,
1148            completion.session_id,
1149            completion.status,
1150            completion.exit_code,
1151            completion.command,
1152            completion.output_preview,
1153            completion.output_truncated,
1154        )));
1155    }
1156
1157    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1158        if !self
1159            .inner
1160            .long_running_reminder_enabled
1161            .load(Ordering::SeqCst)
1162        {
1163            return;
1164        }
1165        let interval_ms = self
1166            .inner
1167            .long_running_reminder_interval_ms
1168            .load(Ordering::SeqCst);
1169        if interval_ms == 0 {
1170            return;
1171        }
1172        let interval = Duration::from_millis(interval_ms);
1173        let now = Instant::now();
1174        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1175            return;
1176        };
1177        let since = last_reminder_at.unwrap_or(task.started);
1178        if now.duration_since(since) < interval {
1179            return;
1180        }
1181        let command = task
1182            .state
1183            .lock()
1184            .map(|state| state.metadata.command.clone())
1185            .unwrap_or_default();
1186        *last_reminder_at = Some(now);
1187        self.emit_bash_long_running(BashLongRunningFrame::new(
1188            task.task_id.clone(),
1189            task.session_id.clone(),
1190            command,
1191            task.started.elapsed().as_millis() as u64,
1192        ));
1193    }
1194
1195    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1196        let Ok(progress_sender) = self
1197            .inner
1198            .progress_sender
1199            .lock()
1200            .map(|sender| sender.clone())
1201        else {
1202            return;
1203        };
1204        if let Some(sender) = progress_sender.as_ref() {
1205            sender(PushFrame::BashLongRunning(frame));
1206        }
1207    }
1208
1209    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1210        self.inner
1211            .tasks
1212            .lock()
1213            .ok()
1214            .and_then(|tasks| tasks.get(task_id).cloned())
1215    }
1216
1217    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1218        self.task(task_id)
1219            .filter(|task| task.session_id == session_id)
1220    }
1221
1222    fn running_count(&self) -> usize {
1223        self.inner
1224            .tasks
1225            .lock()
1226            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1227            .unwrap_or(0)
1228    }
1229
1230    fn start_watchdog(&self) {
1231        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1232            super::watchdog::start(self.clone());
1233        }
1234    }
1235
1236    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1237        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1238    }
1239
1240    #[cfg(test)]
1241    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1242        self.task_for_session(task_id, session_id)
1243            .map(|task| task.paths.json.clone())
1244    }
1245
1246    #[cfg(test)]
1247    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1248        self.task_for_session(task_id, session_id)
1249            .map(|task| task.paths.exit.clone())
1250    }
1251
1252    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
1253    fn generate_unique_task_id(&self) -> Result<String, String> {
1254        for _ in 0..32 {
1255            let candidate = random_slug();
1256            let tasks = self
1257                .inner
1258                .tasks
1259                .lock()
1260                .map_err(|_| "background task registry lock poisoned".to_string())?;
1261            if tasks.contains_key(&candidate) {
1262                continue;
1263            }
1264            let completions = self
1265                .inner
1266                .completions
1267                .lock()
1268                .map_err(|_| "background completions lock poisoned".to_string())?;
1269            if completions
1270                .iter()
1271                .any(|completion| completion.task_id == candidate)
1272            {
1273                continue;
1274            }
1275            return Ok(candidate);
1276        }
1277        Err("failed to allocate unique background task id after 32 attempts".to_string())
1278    }
1279}
1280
1281impl Default for BgTaskRegistry {
1282    fn default() -> Self {
1283        Self::new(Arc::new(Mutex::new(None)))
1284    }
1285}
1286
1287fn modified_within(path: &Path, grace: Duration) -> bool {
1288    fs::metadata(path)
1289        .and_then(|metadata| metadata.modified())
1290        .ok()
1291        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1292        .map(|age| age < grace)
1293        .unwrap_or(false)
1294}
1295
1296fn canonicalized_path(path: &Path) -> PathBuf {
1297    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1298}
1299
1300fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1301    let now_ms = SystemTime::now()
1302        .duration_since(UNIX_EPOCH)
1303        .ok()
1304        .map(|duration| duration.as_millis() as u64)
1305        .unwrap_or(started_at);
1306    let elapsed_ms = now_ms.saturating_sub(started_at);
1307    Instant::now()
1308        .checked_sub(Duration::from_millis(elapsed_ms))
1309        .unwrap_or_else(Instant::now)
1310}
1311
1312fn gc_quarantine(storage_dir: &Path) {
1313    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1314    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1315        return;
1316    };
1317    for session_entry in session_dirs.flatten() {
1318        let session_quarantine_dir = session_entry.path();
1319        if !session_quarantine_dir.is_dir() {
1320            continue;
1321        }
1322        let entries = match fs::read_dir(&session_quarantine_dir) {
1323            Ok(entries) => entries,
1324            Err(error) => {
1325                log::warn!(
1326                    "failed to read background task quarantine dir {}: {error}",
1327                    session_quarantine_dir.display()
1328                );
1329                continue;
1330            }
1331        };
1332        for entry in entries.flatten() {
1333            let path = entry.path();
1334            if modified_within(&path, QUARANTINE_GC_GRACE) {
1335                continue;
1336            }
1337            let result = if path.is_dir() {
1338                fs::remove_dir_all(&path)
1339            } else {
1340                fs::remove_file(&path)
1341            };
1342            match result {
1343                Ok(()) => log::debug!(
1344                    "deleted old background task quarantine entry {}",
1345                    path.display()
1346                ),
1347                Err(error) => log::warn!(
1348                    "failed to delete old background task quarantine entry {}: {error}",
1349                    path.display()
1350                ),
1351            }
1352        }
1353        let _ = fs::remove_dir(&session_quarantine_dir);
1354    }
1355    let _ = fs::remove_dir(&quarantine_root);
1356}
1357
1358fn quarantine_corrupt_task_json(
1359    storage_dir: &Path,
1360    session_dir: &Path,
1361    json_path: &Path,
1362) -> Result<(), String> {
1363    let session_hash = session_dir
1364        .file_name()
1365        .and_then(|name| name.to_str())
1366        .ok_or_else(|| {
1367            format!(
1368                "invalid background task session dir: {}",
1369                session_dir.display()
1370            )
1371        })?;
1372    let task_name = json_path
1373        .file_name()
1374        .and_then(|name| name.to_str())
1375        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1376    let unix_ts = SystemTime::now()
1377        .duration_since(UNIX_EPOCH)
1378        .map(|duration| duration.as_secs())
1379        .unwrap_or(0);
1380    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1381    fs::create_dir_all(&quarantine_dir).map_err(|e| {
1382        format!(
1383            "failed to create background task quarantine dir {}: {e}",
1384            quarantine_dir.display()
1385        )
1386    })?;
1387    let target = quarantine_dir.join(format!("{task_name}.corrupt-{unix_ts}"));
1388    fs::rename(json_path, &target).map_err(|e| {
1389        format!(
1390            "failed to quarantine corrupt background task metadata {} to {}: {e}",
1391            json_path.display(),
1392            target.display()
1393        )
1394    })?;
1395
1396    for sibling in task_sibling_paths(json_path) {
1397        if !sibling.exists() {
1398            continue;
1399        }
1400        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1401            log::warn!(
1402                "skipping background task sibling with invalid name during quarantine: {}",
1403                sibling.display()
1404            );
1405            continue;
1406        };
1407        let sibling_target = quarantine_dir.join(format!("{sibling_name}.corrupt-{unix_ts}"));
1408        if let Err(error) = fs::rename(&sibling, &sibling_target) {
1409            log::warn!(
1410                "failed to quarantine background task sibling {} to {}: {error}",
1411                sibling.display(),
1412                sibling_target.display()
1413            );
1414        }
1415    }
1416
1417    let _ = fs::remove_dir(session_dir);
1418    Ok(())
1419}
1420
1421fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
1422    let Some(parent) = json_path.parent() else {
1423        return Vec::new();
1424    };
1425    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
1426        return Vec::new();
1427    };
1428    ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
1429        .into_iter()
1430        .map(|extension| parent.join(format!("{stem}.{extension}")))
1431        .collect()
1432}
1433
1434fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
1435    let stdout = fs::read(&paths.stdout).unwrap_or_default();
1436    let stderr = fs::read(&paths.stderr).unwrap_or_default();
1437    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
1438    bytes.extend_from_slice(&stdout);
1439    bytes.extend_from_slice(&stderr);
1440    if bytes.len() <= max_bytes {
1441        return (String::from_utf8_lossy(&bytes).into_owned(), false);
1442    }
1443    let start = bytes.len().saturating_sub(max_bytes);
1444    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
1445}
1446
1447impl BgTask {
1448    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1449        let state = self
1450            .state
1451            .lock()
1452            .unwrap_or_else(|poison| poison.into_inner());
1453        self.snapshot_locked(&state, preview_bytes)
1454    }
1455
1456    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1457        let metadata = &state.metadata;
1458        let duration_ms = metadata.duration_ms.or_else(|| {
1459            metadata
1460                .status
1461                .is_terminal()
1462                .then(|| self.started.elapsed().as_millis() as u64)
1463        });
1464        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1465        BgTaskSnapshot {
1466            info: BgTaskInfo {
1467                task_id: self.task_id.clone(),
1468                status: metadata.status.clone(),
1469                command: metadata.command.clone(),
1470                started_at: metadata.started_at,
1471                duration_ms,
1472            },
1473            exit_code: metadata.exit_code,
1474            child_pid: metadata.child_pid,
1475            workdir: metadata.workdir.display().to_string(),
1476            output_preview,
1477            output_truncated,
1478            output_path: state
1479                .buffer
1480                .output_path()
1481                .map(|path| path.display().to_string()),
1482            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1483        }
1484    }
1485
1486    pub(crate) fn is_running(&self) -> bool {
1487        self.state
1488            .lock()
1489            .map(|state| state.metadata.status == BgTaskStatus::Running)
1490            .unwrap_or(false)
1491    }
1492
1493    fn mark_terminal_now(&self) {
1494        if let Ok(mut terminal_at) = self.terminal_at.lock() {
1495            if terminal_at.is_none() {
1496                *terminal_at = Some(Instant::now());
1497            }
1498        }
1499    }
1500
1501    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1502        let mut state = self
1503            .state
1504            .lock()
1505            .map_err(|_| "background task lock poisoned".to_string())?;
1506        let updated = update_task(&self.paths.json, |metadata| {
1507            metadata.completion_delivered = delivered;
1508        })
1509        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1510        state.metadata = updated;
1511        Ok(())
1512    }
1513}
1514
1515fn terminal_metadata_from_marker(
1516    mut metadata: PersistedTask,
1517    marker: ExitMarker,
1518    reason: Option<String>,
1519) -> PersistedTask {
1520    match marker {
1521        ExitMarker::Code(code) => {
1522            let status = if code == 0 {
1523                BgTaskStatus::Completed
1524            } else {
1525                BgTaskStatus::Failed
1526            };
1527            metadata.mark_terminal(status, Some(code), reason);
1528        }
1529        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1530    }
1531    metadata
1532}
1533
1534#[cfg(unix)]
1535fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1536    let shell = resolve_posix_shell();
1537    let mut cmd = Command::new(&shell);
1538    cmd.arg("-c")
1539        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1540        .arg(&shell)
1541        .arg(command)
1542        .arg(exit_path);
1543    unsafe {
1544        cmd.pre_exec(|| {
1545            if libc::setsid() == -1 {
1546                return Err(std::io::Error::last_os_error());
1547            }
1548            Ok(())
1549        });
1550    }
1551    cmd
1552}
1553
1554#[cfg(unix)]
1555fn resolve_posix_shell() -> PathBuf {
1556    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
1557    POSIX_SHELL
1558        .get_or_init(|| {
1559            std::env::var_os("BASH")
1560                .filter(|value| !value.is_empty())
1561                .map(PathBuf::from)
1562                .filter(|path| path.exists())
1563                .or_else(|| which::which("bash").ok())
1564                .or_else(|| which::which("zsh").ok())
1565                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
1566        })
1567        .clone()
1568}
1569
1570#[cfg(windows)]
1571fn detached_shell_command_for(
1572    shell: crate::windows_shell::WindowsShell,
1573    command: &str,
1574    exit_path: &Path,
1575    paths: &TaskPaths,
1576    creation_flags: u32,
1577) -> Result<Command, String> {
1578    use crate::windows_shell::WindowsShell;
1579    // Write the wrapper to a temp file alongside the other task files,
1580    // then invoke the shell with the file path as a single clean
1581    // argument. This sidesteps the entire Windows command-line quoting
1582    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
1583    // parser all interacting with embedded quotes in the wrapper).
1584    //
1585    // Path arguments don't need quoting in the same problematic way
1586    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
1587    // contains no characters that need shell escaping; (2) the wrapper
1588    // body's internal quotes never reach the shell command line — the
1589    // shell reads them from disk by file syntax rules, not command-line
1590    // parser rules.
1591    let wrapper_body = shell.wrapper_script(command, exit_path);
1592    let wrapper_ext = match shell {
1593        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1594        WindowsShell::Cmd => "bat",
1595        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
1596        // so the file extension is purely cosmetic; `.sh` matches what an
1597        // operator would expect when grepping the spill directory.
1598        WindowsShell::Posix(_) => "sh",
1599    };
1600    let wrapper_path = paths.dir.join(format!(
1601        "{}.{}",
1602        paths
1603            .json
1604            .file_stem()
1605            .and_then(|s| s.to_str())
1606            .unwrap_or("wrapper"),
1607        wrapper_ext
1608    ));
1609    fs::write(&wrapper_path, wrapper_body)
1610        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1611
1612    let mut cmd = Command::new(shell.binary().as_ref());
1613    match shell {
1614        WindowsShell::Pwsh | WindowsShell::Powershell => {
1615            // -File runs the script with no quoting issues. `-NoLogo`,
1616            // `-NoProfile`, etc. apply to the host before the file runs.
1617            cmd.args([
1618                "-NoLogo",
1619                "-NoProfile",
1620                "-NonInteractive",
1621                "-ExecutionPolicy",
1622                "Bypass",
1623                "-File",
1624            ]);
1625            cmd.arg(&wrapper_path);
1626        }
1627        WindowsShell::Cmd => {
1628            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
1629            // file via /C is well-defined; the file's contents are
1630            // read line-by-line by cmd's batch processor, NOT
1631            // re-interpreted by the /C parser. This avoids the
1632            // "filename syntax incorrect" errors that came from
1633            // having complex compound commands on the cmd line.
1634            cmd.args(["/D", "/C"]);
1635            cmd.arg(&wrapper_path);
1636        }
1637        WindowsShell::Posix(_) => {
1638            // git-bash and other POSIX shells run the wrapper script with
1639            // `<binary> <wrapper-path>` (the wrapper is just a shell
1640            // script). No special flags needed — the `trap` and atomic
1641            // exit-marker rename in `wrapper_script` are POSIX-standard.
1642            cmd.arg(&wrapper_path);
1643        }
1644    }
1645
1646    // Win32 process creation flags. Caller selects whether to include
1647    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
1648    // for the breakaway-fallback strategy.
1649    cmd.creation_flags(creation_flags);
1650    Ok(cmd)
1651}
1652
1653/// Spawn a detached background bash child process.
1654///
1655/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1656/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1657/// cmd.exe) and retries with the next candidate when the previous one
1658/// fails to spawn with `NotFound` — the same runtime safety net the
1659/// foreground bash path has, so issue #27 callers landing on cmd.exe
1660/// fallback can also use background bash. The wrapper script is
1661/// regenerated per attempt because PowerShell wrappers embed the shell
1662/// binary by name; the stdout/stderr capture handles are also reopened
1663/// per attempt because `Command::spawn()` consumes them.
1664///
1665/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1666/// return immediately without retry — they indicate a problem with the
1667/// resolved shell that retrying with a different shell won't fix.
1668fn spawn_detached_child(
1669    command: &str,
1670    paths: &TaskPaths,
1671    workdir: &Path,
1672    env: &HashMap<String, String>,
1673) -> Result<std::process::Child, String> {
1674    #[cfg(not(windows))]
1675    {
1676        let stdout = create_capture_file(&paths.stdout)
1677            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1678        let stderr = create_capture_file(&paths.stderr)
1679            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1680        detached_shell_command(command, &paths.exit)
1681            .current_dir(workdir)
1682            .envs(env)
1683            .stdin(Stdio::null())
1684            .stdout(Stdio::from(stdout))
1685            .stderr(Stdio::from(stderr))
1686            .spawn()
1687            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1688    }
1689    #[cfg(windows)]
1690    {
1691        use crate::windows_shell::shell_candidates;
1692        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
1693        // legacy foreground bash spawn path. v0.20 routes ALL bash through
1694        // this background spawn helper, including foreground tool calls
1695        // where the model writes PowerShell-syntax (`$var = ...`,
1696        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
1697        // The earlier v0.18-era cmd-first override worked around a
1698        // PowerShell detached-output bug; that bug is fixed at the
1699        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
1700        // see flag block below), so we no longer need to misroute PS
1701        // commands through cmd.
1702        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1703        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
1704        // first (so the bg child outlives the AFT process when AFT is killed),
1705        // then fall back without it for environments where the parent is in a
1706        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
1707        // runners (GitHub Actions windows-2022) and some MDM-managed corp
1708        // environments hit this — `CreateProcess` returns Access Denied (5).
1709        // Without breakaway, the child still runs detached but will be torn
1710        // down with the parent if the parent process group is signaled.
1711        //
1712        // We use CREATE_NO_WINDOW (no visible console window, but the
1713        // child still has a hidden console) rather than DETACHED_PROCESS
1714        // (no console at all). PowerShell-based wrappers that perform
1715        // file I/O via [System.IO.File] need a console handle to flush
1716        // stdout/stderr correctly even when redirected — under
1717        // DETACHED_PROCESS, pwsh sometimes silently exits before
1718        // executing later script statements (the Move-Item that writes
1719        // the exit marker never runs), leaving the bg task forever
1720        // marked Failed: process exited without exit marker. cmd.exe
1721        // wrappers tolerate DETACHED_PROCESS, but switching to
1722        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
1723        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1724        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1725        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1726        let with_breakaway =
1727            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1728        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1729        let mut last_error: Option<String> = None;
1730        for (idx, shell) in candidates.iter().enumerate() {
1731            // Per-shell, try with breakaway first. If the process is in a
1732            // restrictive job, the breakaway flag triggers Access Denied
1733            // (os error 5). Retry once without breakaway.
1734            for &flags in &[with_breakaway, without_breakaway] {
1735                // Re-open capture handles per attempt; spawn() consumes them.
1736                let stdout = create_capture_file(&paths.stdout)
1737                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1738                let stderr = create_capture_file(&paths.stderr)
1739                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1740                let mut cmd =
1741                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1742                cmd.current_dir(workdir)
1743                    .envs(env)
1744                    .stdin(Stdio::null())
1745                    .stdout(Stdio::from(stdout))
1746                    .stderr(Stdio::from(stderr));
1747                match cmd.spawn() {
1748                    Ok(child) => {
1749                        if idx > 0 {
1750                            log::warn!(
1751                                "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1752                                 the cached PATH probe disagreed with runtime spawn — likely PATH \
1753                                 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1754                                shell.binary(),
1755                                idx
1756                            );
1757                        }
1758                        if flags == without_breakaway {
1759                            log::warn!(
1760                                "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1761                                 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1762                                 Spawned without breakaway; the bg task will be torn down if the \
1763                                 AFT process group is killed."
1764                            );
1765                        }
1766                        return Ok(child);
1767                    }
1768                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1769                        log::warn!(
1770                            "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1771                            shell.binary()
1772                        );
1773                        last_error = Some(format!("{}: {e}", shell.binary()));
1774                        // Skip the without-breakaway retry for NotFound — the
1775                        // binary itself is missing, breakaway flag is irrelevant.
1776                        break;
1777                    }
1778                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1779                        // Access Denied during breakaway — retry without it.
1780                        log::warn!(
1781                            "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1782                             Access Denied — retrying {} without breakaway",
1783                            shell.binary()
1784                        );
1785                        last_error = Some(format!("{}: {e}", shell.binary()));
1786                        continue;
1787                    }
1788                    Err(e) => {
1789                        return Err(format!(
1790                            "failed to spawn background bash command via {}: {e}",
1791                            shell.binary()
1792                        ));
1793                    }
1794                }
1795            }
1796        }
1797        Err(format!(
1798            "failed to spawn background bash command: no Windows shell could be spawned. \
1799             Last error: {}. PATH-probed candidates: {:?}",
1800            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1801            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1802        ))
1803    }
1804}
1805
1806fn random_slug() -> String {
1807    let mut bytes = [0u8; 4];
1808    // getrandom is a transitive dependency; use it directly for OS entropy.
1809    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1810        // Extremely unlikely fallback: time + pid mix.
1811        let t = SystemTime::now()
1812            .duration_since(UNIX_EPOCH)
1813            .map(|d| d.subsec_nanos())
1814            .unwrap_or(0);
1815        let p = std::process::id();
1816        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1817    });
1818    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
1819    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1820    format!("bash-{hex}")
1821}
1822
1823#[cfg(test)]
1824mod tests {
1825    use std::collections::HashMap;
1826    #[cfg(windows)]
1827    use std::fs;
1828    use std::sync::{Arc, Mutex};
1829    use std::time::Duration;
1830    #[cfg(windows)]
1831    use std::time::Instant;
1832
1833    use super::*;
1834
1835    #[cfg(unix)]
1836    const QUICK_SUCCESS_COMMAND: &str = "true";
1837    #[cfg(windows)]
1838    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1839
1840    #[cfg(unix)]
1841    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1842    #[cfg(windows)]
1843    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1844
1845    #[test]
1846    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1847        let registry = BgTaskRegistry::default();
1848        let dir = tempfile::tempdir().unwrap();
1849        let task_id = registry
1850            .spawn(
1851                QUICK_SUCCESS_COMMAND,
1852                "session".to_string(),
1853                dir.path().to_path_buf(),
1854                HashMap::new(),
1855                Some(Duration::from_secs(30)),
1856                dir.path().to_path_buf(),
1857                10,
1858                true,
1859                false,
1860                Some(dir.path().to_path_buf()),
1861            )
1862            .unwrap();
1863        registry
1864            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1865            .unwrap();
1866        let completions = registry.drain_completions_for_session(Some("session"));
1867        assert_eq!(completions.len(), 1);
1868
1869        registry.cleanup_finished(Duration::ZERO);
1870
1871        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1872    }
1873
1874    #[test]
1875    fn cleanup_finished_retains_undelivered_terminals() {
1876        let registry = BgTaskRegistry::default();
1877        let dir = tempfile::tempdir().unwrap();
1878        let task_id = registry
1879            .spawn(
1880                QUICK_SUCCESS_COMMAND,
1881                "session".to_string(),
1882                dir.path().to_path_buf(),
1883                HashMap::new(),
1884                Some(Duration::from_secs(30)),
1885                dir.path().to_path_buf(),
1886                10,
1887                true,
1888                false,
1889                Some(dir.path().to_path_buf()),
1890            )
1891            .unwrap();
1892        registry
1893            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1894            .unwrap();
1895
1896        registry.cleanup_finished(Duration::ZERO);
1897
1898        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1899    }
1900
1901    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1902    /// watchdog path (reap_child) marks a task Failed when the child
1903    /// has exited but no exit marker was written. Before this fix the
1904    /// task would remain `Running` until timeout, even though the
1905    /// process was definitely dead.
1906    ///
1907    /// Cross-platform: uses a quick-exiting command that does NOT go
1908    /// through the wrapper script (we manually clear the exit marker
1909    /// after spawn to simulate the wrapper crashing before write).
1910    #[test]
1911    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1912        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1913        let dir = tempfile::tempdir().unwrap();
1914        let task_id = registry
1915            .spawn(
1916                QUICK_SUCCESS_COMMAND,
1917                "session".to_string(),
1918                dir.path().to_path_buf(),
1919                HashMap::new(),
1920                Some(Duration::from_secs(30)),
1921                dir.path().to_path_buf(),
1922                10,
1923                true,
1924                false,
1925                Some(dir.path().to_path_buf()),
1926            )
1927            .unwrap();
1928
1929        let task = registry.task_for_session(&task_id, "session").unwrap();
1930
1931        // Wait for the child to actually exit and the wrapper to either
1932        // write the marker or fail. Then nuke the marker to simulate
1933        // wrapper crash before write. Poll up to 5s; this is plenty for a
1934        // `true`/`cmd /c exit 0` invocation.
1935        let started = Instant::now();
1936        loop {
1937            let exited = {
1938                let mut state = task.state.lock().unwrap();
1939                if let Some(child) = state.child.as_mut() {
1940                    matches!(child.try_wait(), Ok(Some(_)))
1941                } else {
1942                    true
1943                }
1944            };
1945            if exited {
1946                break;
1947            }
1948            assert!(
1949                started.elapsed() < Duration::from_secs(5),
1950                "child should exit quickly"
1951            );
1952            std::thread::sleep(Duration::from_millis(20));
1953        }
1954
1955        // Wrapper likely wrote the marker by now; remove it to simulate
1956        // a wrapper crash that exited before persisting the exit code.
1957        let _ = std::fs::remove_file(&task.paths.exit);
1958
1959        // Sanity: task is still Running per metadata (replay/poll hasn't
1960        // observed the missing marker yet).
1961        assert!(
1962            task.is_running(),
1963            "precondition: metadata.status == Running"
1964        );
1965        assert!(
1966            !task.paths.exit.exists(),
1967            "precondition: exit marker absent"
1968        );
1969
1970        // Invoke the watchdog's reap_child directly. The fix should mark
1971        // the task Failed with the documented reason string, instead of
1972        // just dropping the child handle and leaving status=Running.
1973        registry.reap_child(&task);
1974
1975        let state = task.state.lock().unwrap();
1976        assert!(
1977            state.metadata.status.is_terminal(),
1978            "reap_child must transition to terminal when PID dead and no marker. \
1979             Got status={:?}",
1980            state.metadata.status
1981        );
1982        assert_eq!(
1983            state.metadata.status,
1984            BgTaskStatus::Failed,
1985            "must specifically be Failed (not Killed): status={:?}",
1986            state.metadata.status
1987        );
1988        assert_eq!(
1989            state.metadata.status_reason.as_deref(),
1990            Some("process exited without exit marker"),
1991            "reason must match replay path's wording: {:?}",
1992            state.metadata.status_reason
1993        );
1994        assert!(
1995            state.child.is_none(),
1996            "child handle must be released after reap"
1997        );
1998        assert!(state.detached, "task must be marked detached after reap");
1999    }
2000
2001    /// Companion to the above: when the exit marker DOES exist on disk
2002    /// at reap_child time (race window — wrapper finished writing
2003    /// between try_wait and the marker check), reap_child must NOT mark
2004    /// the task Failed. Instead it leaves status=Running and lets the
2005    /// next poll_task() cycle finalize via the marker.
2006    #[test]
2007    fn reap_child_preserves_running_when_exit_marker_exists() {
2008        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2009        let dir = tempfile::tempdir().unwrap();
2010        let task_id = registry
2011            .spawn(
2012                QUICK_SUCCESS_COMMAND,
2013                "session".to_string(),
2014                dir.path().to_path_buf(),
2015                HashMap::new(),
2016                Some(Duration::from_secs(30)),
2017                dir.path().to_path_buf(),
2018                10,
2019                true,
2020                false,
2021                Some(dir.path().to_path_buf()),
2022            )
2023            .unwrap();
2024
2025        let task = registry.task_for_session(&task_id, "session").unwrap();
2026
2027        // Wait for child to exit AND for the marker to land. Both happen
2028        // shortly after the wrapper finishes — but we want both observed.
2029        let started = Instant::now();
2030        loop {
2031            let exited = {
2032                let mut state = task.state.lock().unwrap();
2033                if let Some(child) = state.child.as_mut() {
2034                    matches!(child.try_wait(), Ok(Some(_)))
2035                } else {
2036                    true
2037                }
2038            };
2039            if exited && task.paths.exit.exists() {
2040                break;
2041            }
2042            assert!(
2043                started.elapsed() < Duration::from_secs(5),
2044                "child should exit and write marker quickly"
2045            );
2046            std::thread::sleep(Duration::from_millis(20));
2047        }
2048
2049        // reap_child sees: child exited, marker exists. It should:
2050        //  - drop state.child / set state.detached = true
2051        //  - NOT change status (poll_task will finalize via marker next tick)
2052        registry.reap_child(&task);
2053
2054        let state = task.state.lock().unwrap();
2055        assert!(
2056            state.child.is_none(),
2057            "child handle still released even when marker exists"
2058        );
2059        assert!(
2060            state.detached,
2061            "task still marked detached even when marker exists"
2062        );
2063        // Status remains Running because reap_child defers to poll_task
2064        // when a marker exists. It would be wrong for reap to record the
2065        // marker outcome (poll_task does that with proper exit-code
2066        // parsing).
2067        assert_eq!(
2068            state.metadata.status,
2069            BgTaskStatus::Running,
2070            "reap_child must defer to poll_task when marker exists"
2071        );
2072    }
2073
2074    #[test]
2075    fn cleanup_finished_keeps_running_tasks() {
2076        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2077        let dir = tempfile::tempdir().unwrap();
2078        let task_id = registry
2079            .spawn(
2080                LONG_RUNNING_COMMAND,
2081                "session".to_string(),
2082                dir.path().to_path_buf(),
2083                HashMap::new(),
2084                Some(Duration::from_secs(30)),
2085                dir.path().to_path_buf(),
2086                10,
2087                true,
2088                false,
2089                Some(dir.path().to_path_buf()),
2090            )
2091            .unwrap();
2092
2093        registry.cleanup_finished(Duration::ZERO);
2094
2095        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2096        let _ = registry.kill(&task_id, "session");
2097    }
2098
2099    #[cfg(windows)]
2100    fn wait_for_file(path: &Path) -> String {
2101        let started = Instant::now();
2102        loop {
2103            if path.exists() {
2104                return fs::read_to_string(path).expect("read file");
2105            }
2106            assert!(
2107                started.elapsed() < Duration::from_secs(30),
2108                "timed out waiting for {}",
2109                path.display()
2110            );
2111            std::thread::sleep(Duration::from_millis(100));
2112        }
2113    }
2114
2115    #[cfg(windows)]
2116    fn spawn_windows_registry_command(
2117        command: &str,
2118    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2119        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2120        let dir = tempfile::tempdir().unwrap();
2121        let task_id = registry
2122            .spawn(
2123                command,
2124                "session".to_string(),
2125                dir.path().to_path_buf(),
2126                HashMap::new(),
2127                Some(Duration::from_secs(30)),
2128                dir.path().to_path_buf(),
2129                10,
2130                false,
2131                false,
2132                Some(dir.path().to_path_buf()),
2133            )
2134            .unwrap();
2135        (registry, dir, task_id)
2136    }
2137
2138    #[cfg(windows)]
2139    #[test]
2140    fn windows_spawn_writes_exit_marker_for_zero_exit() {
2141        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2142        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2143
2144        let content = wait_for_file(&exit_path);
2145
2146        assert_eq!(content.trim(), "0");
2147    }
2148
2149    #[cfg(windows)]
2150    #[test]
2151    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2152        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2153        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2154
2155        let content = wait_for_file(&exit_path);
2156
2157        assert_eq!(content.trim(), "42");
2158    }
2159
2160    #[cfg(windows)]
2161    #[test]
2162    fn windows_spawn_captures_stdout_to_disk() {
2163        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2164        let task = registry.task_for_session(&task_id, "session").unwrap();
2165        let stdout_path = task.paths.stdout.clone();
2166        let exit_path = task.paths.exit.clone();
2167
2168        let _ = wait_for_file(&exit_path);
2169        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2170
2171        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2172    }
2173
2174    #[cfg(windows)]
2175    #[test]
2176    fn windows_spawn_uses_pwsh_when_available() {
2177        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
2178        // (We intentionally pass None for shell_env to keep this test
2179        // independent of the runner's actual env.)
2180        let candidates = crate::windows_shell::shell_candidates_with(
2181            |binary| match binary {
2182                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2183                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2184                _ => None,
2185            },
2186            || None,
2187        );
2188        let shell = candidates.first().expect("at least one candidate").clone();
2189        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2190        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2191    }
2192
2193    /// Issue #27 Oracle review P1: cmd wrapper MUST use `!ERRORLEVEL!` (not
2194    /// `%ERRORLEVEL%`) to capture the user command's run-time exit code.
2195    /// `%VAR%` is parse-time-expanded by cmd, so a wrapper using
2196    /// `%ERRORLEVEL%` would record a stale value (typically 0 from cmd's
2197    /// startup) regardless of what the user command returned. `!VAR!`
2198    /// requires delayed expansion, which `WindowsShell::bg_command` enables
2199    /// via `/V:ON`.
2200    #[cfg(windows)]
2201    #[test]
2202    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2203        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2204        let script =
2205            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2206
2207        // MUST use !ERRORLEVEL! (delayed expansion), NOT %ERRORLEVEL%
2208        // (parse-time expansion). The latter would record a stale exit
2209        // code regardless of what the user command actually returned.
2210        assert!(
2211            script.contains("& echo !ERRORLEVEL! >"),
2212            "wrapper must use delayed expansion: {script}"
2213        );
2214        assert!(
2215            !script.contains("%ERRORLEVEL%"),
2216            "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
2217        );
2218        assert!(script.contains("& move /Y"));
2219        // move output should be redirected to nul to avoid polluting the
2220        // user's captured stdout with "1 file(s) moved." lines.
2221        assert!(
2222            script.contains("> nul"),
2223            "wrapper must redirect move output to nul: {script}"
2224        );
2225        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
2226        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
2227    }
2228
2229    /// Issue #27 Oracle review P1: `bg_command()` for Cmd MUST prepend
2230    /// `/V:ON` to enable delayed expansion AND `/S` to use simple-quote
2231    /// parsing (so cmd's /C parser doesn't mangle the wrapper's internal
2232    /// quotes from `cmd_quote`).
2233    #[cfg(windows)]
2234    #[test]
2235    fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
2236        use crate::windows_shell::WindowsShell;
2237        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
2238        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2239        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2240        assert_eq!(
2241            args_strs,
2242            vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
2243            "Cmd::bg_command must prepend /V:ON /D /S /C"
2244        );
2245    }
2246
2247    /// PowerShell variants don't need `/V:ON`-style flags; their args
2248    /// are the same for foreground (`command()`) and background
2249    /// (`bg_command()`).
2250    #[cfg(windows)]
2251    #[test]
2252    fn windows_shell_pwsh_bg_command_uses_standard_args() {
2253        use crate::windows_shell::WindowsShell;
2254        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
2255        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2256        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2257        assert!(
2258            args_strs.contains(&"-Command"),
2259            "Pwsh::bg_command must use -Command: {args_strs:?}"
2260        );
2261        assert!(
2262            args_strs.contains(&"Get-Date"),
2263            "Pwsh::bg_command must include the user command body"
2264        );
2265    }
2266
2267    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
2268    /// **cmd.exe-specific** wrapper path captures the user command's
2269    /// run-time exit code correctly. The existing
2270    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
2271    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
2272    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
2273    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
2274    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
2275    ///
2276    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
2277    /// force the cmd-wrapper code path, then writes the exit marker and
2278    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
2279    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
2280    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
2281    /// regardless of what the user command returned.
2282    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
2283    /// the inline command-line wrapper helper that production code does
2284    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
2285    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
2286    /// produced silent failures on Windows 11 (move /Y could not parse
2287    /// path arguments through cmd's /C parser). The `bg_command` helper
2288    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
2289    /// the production spawn path goes through `detached_shell_command_for`
2290    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
2291    /// <bat-path>`.
2292    ///
2293    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
2294    /// covered live by the Windows e2e harness scenario 2d
2295    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
2296    /// exercises the real file-based wrapper end-to-end via the protocol.
2297    #[allow(dead_code)]
2298    #[cfg(any())] // disabled on all targets
2299    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
2300}