Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21    create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22    update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30// Note: `resolve_windows_shell` is no longer imported at module scope —
31// production code in `spawn_detached_child` uses `shell_candidates()`
32// with retry instead, and the function remains in `windows_shell.rs`
33// for tests and as a future helper.
34
35/// Default timeout for background bash tasks: 30 minutes.
36/// Agents can override per-call via the `timeout` parameter (in ms).
37const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
41/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
42/// of typical command output (git status, test results, exit messages) — short
43/// enough that round-tripping multiple completions in one reminder stays well
44/// under the model's context budget but long enough that most successful runs
45/// don't need a follow-up `bash_status` call.
46const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50    pub task_id: String,
51    /// Intentionally omitted from serialized completion payloads: push frames
52    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
53    #[serde(skip_serializing)]
54    pub session_id: String,
55    pub status: BgTaskStatus,
56    pub exit_code: Option<i32>,
57    pub command: String,
58    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
59    /// cached so push-frame consumers and `bash_drain_completions` callers see
60    /// the same preview without racing against later output rotation. Empty
61    /// when not captured (e.g., persisted task seen on startup before buffer
62    /// reattachment).
63    #[serde(default, skip_serializing_if = "String::is_empty")]
64    pub output_preview: String,
65    /// True when the captured tail is shorter than the actual output (because
66    /// rotation occurred or the output exceeds the preview cap). Plugins use
67    /// this to render a `…` prefix and signal that `bash_status` would return
68    /// more.
69    #[serde(default, skip_serializing_if = "is_false")]
70    pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74    !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79    #[serde(flatten)]
80    pub info: BgTaskInfo,
81    pub exit_code: Option<i32>,
82    pub child_pid: Option<u32>,
83    pub workdir: String,
84    pub output_preview: String,
85    pub output_truncated: bool,
86    pub output_path: Option<String>,
87    pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92    pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98    pub(crate) progress_sender: SharedProgressSender,
99    watchdog_started: AtomicBool,
100    pub(crate) shutdown: AtomicBool,
101}
102
103pub(crate) struct BgTask {
104    pub(crate) task_id: String,
105    pub(crate) session_id: String,
106    pub(crate) paths: TaskPaths,
107    pub(crate) started: Instant,
108    pub(crate) terminal_at: Mutex<Option<Instant>>,
109    pub(crate) state: Mutex<BgTaskState>,
110}
111
112pub(crate) struct BgTaskState {
113    pub(crate) metadata: PersistedTask,
114    pub(crate) child: Option<Child>,
115    pub(crate) detached: bool,
116    pub(crate) buffer: BgBuffer,
117}
118
119impl BgTaskRegistry {
120    pub fn new(progress_sender: SharedProgressSender) -> Self {
121        Self {
122            inner: Arc::new(RegistryInner {
123                tasks: Mutex::new(HashMap::new()),
124                completions: Mutex::new(VecDeque::new()),
125                progress_sender,
126                watchdog_started: AtomicBool::new(false),
127                shutdown: AtomicBool::new(false),
128            }),
129        }
130    }
131
132    #[cfg(unix)]
133    pub fn spawn(
134        &self,
135        command: &str,
136        session_id: String,
137        workdir: PathBuf,
138        env: HashMap<String, String>,
139        timeout: Option<Duration>,
140        storage_dir: PathBuf,
141        max_running: usize,
142    ) -> Result<String, String> {
143        self.start_watchdog();
144
145        let running = self.running_count();
146        if running >= max_running {
147            return Err(format!(
148                "background bash task limit exceeded: {running} running (max {max_running})"
149            ));
150        }
151
152        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
153        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
154        let task_id = self.generate_unique_task_id()?;
155        let paths = task_paths(&storage_dir, &session_id, &task_id);
156        fs::create_dir_all(&paths.dir)
157            .map_err(|e| format!("failed to create background task dir: {e}"))?;
158
159        let mut metadata = PersistedTask::starting(
160            task_id.clone(),
161            session_id.clone(),
162            command.to_string(),
163            workdir.clone(),
164            timeout_ms,
165        );
166        write_task(&paths.json, &metadata)
167            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
168
169        // Pre-create capture files so the watchdog/buffer can always
170        // open them for reading. The spawn helper opens its own handles
171        // per attempt because each `Command::spawn()` consumes them.
172        create_capture_file(&paths.stdout)
173            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
174        create_capture_file(&paths.stderr)
175            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
176
177        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
178
179        let child_pid = child.id();
180        metadata.mark_running(child_pid, child_pid as i32);
181        write_task(&paths.json, &metadata)
182            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
183
184        let task = Arc::new(BgTask {
185            task_id: task_id.clone(),
186            session_id,
187            paths: paths.clone(),
188            started: Instant::now(),
189            terminal_at: Mutex::new(None),
190            state: Mutex::new(BgTaskState {
191                metadata,
192                child: Some(child),
193                detached: false,
194                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
195            }),
196        });
197
198        self.inner
199            .tasks
200            .lock()
201            .map_err(|_| "background task registry lock poisoned".to_string())?
202            .insert(task_id.clone(), task);
203
204        Ok(task_id)
205    }
206
207    #[cfg(windows)]
208    pub fn spawn(
209        &self,
210        command: &str,
211        session_id: String,
212        workdir: PathBuf,
213        env: HashMap<String, String>,
214        timeout: Option<Duration>,
215        storage_dir: PathBuf,
216        max_running: usize,
217    ) -> Result<String, String> {
218        self.start_watchdog();
219
220        let running = self.running_count();
221        if running >= max_running {
222            return Err(format!(
223                "background bash task limit exceeded: {running} running (max {max_running})"
224            ));
225        }
226
227        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
228        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
229        let task_id = self.generate_unique_task_id()?;
230        let paths = task_paths(&storage_dir, &session_id, &task_id);
231        fs::create_dir_all(&paths.dir)
232            .map_err(|e| format!("failed to create background task dir: {e}"))?;
233
234        let mut metadata = PersistedTask::starting(
235            task_id.clone(),
236            session_id.clone(),
237            command.to_string(),
238            workdir.clone(),
239            timeout_ms,
240        );
241        write_task(&paths.json, &metadata)
242            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
243
244        // Capture files are pre-created so the watchdog/buffer can always
245        // open them for reading even if the child hasn't written anything
246        // yet. The spawn helper opens its own handles per attempt because
247        // each `Command::spawn()` consumes them, and on Windows we may
248        // retry across multiple shell candidates if the first one fails.
249        create_capture_file(&paths.stdout)
250            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
251        create_capture_file(&paths.stderr)
252            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
253
254        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
255
256        let child_pid = child.id();
257        metadata.status = BgTaskStatus::Running;
258        metadata.child_pid = Some(child_pid);
259        metadata.pgid = None;
260        write_task(&paths.json, &metadata)
261            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
262
263        let task = Arc::new(BgTask {
264            task_id: task_id.clone(),
265            session_id,
266            paths: paths.clone(),
267            started: Instant::now(),
268            terminal_at: Mutex::new(None),
269            state: Mutex::new(BgTaskState {
270                metadata,
271                child: Some(child),
272                detached: false,
273                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
274            }),
275        });
276
277        self.inner
278            .tasks
279            .lock()
280            .map_err(|_| "background task registry lock poisoned".to_string())?
281            .insert(task_id.clone(), task);
282
283        Ok(task_id)
284    }
285
286    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
287        self.start_watchdog();
288        let dir = session_tasks_dir(storage_dir, session_id);
289        if !dir.exists() {
290            return Ok(());
291        }
292
293        let entries = fs::read_dir(&dir)
294            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
295        for entry in entries.flatten() {
296            let path = entry.path();
297            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
298                continue;
299            }
300            let Ok(mut metadata) = read_task(&path) else {
301                continue;
302            };
303            if metadata.session_id != session_id {
304                continue;
305            }
306
307            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
308            match metadata.status {
309                BgTaskStatus::Starting => {
310                    metadata.mark_terminal(
311                        BgTaskStatus::Failed,
312                        None,
313                        Some("spawn aborted".to_string()),
314                    );
315                    let _ = write_task(&paths.json, &metadata);
316                    self.enqueue_completion_if_needed(&metadata, false);
317                }
318                BgTaskStatus::Running => {
319                    if self.running_metadata_is_stale(&metadata) {
320                        metadata.mark_terminal(
321                            BgTaskStatus::Killed,
322                            None,
323                            Some("orphaned (>24h)".to_string()),
324                        );
325                        if !paths.exit.exists() {
326                            let _ = write_kill_marker_if_absent(&paths.exit);
327                        }
328                        let _ = write_task(&paths.json, &metadata);
329                        self.enqueue_completion_if_needed(&metadata, false);
330                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
331                        metadata = terminal_metadata_from_marker(metadata, marker, None);
332                        let _ = write_task(&paths.json, &metadata);
333                        self.enqueue_completion_if_needed(&metadata, false);
334                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
335                        metadata.mark_terminal(
336                            BgTaskStatus::Failed,
337                            None,
338                            Some("process exited without exit marker".to_string()),
339                        );
340                        let _ = write_task(&paths.json, &metadata);
341                        self.enqueue_completion_if_needed(&metadata, false);
342                    } else {
343                        self.insert_rehydrated_task(metadata, paths, true)?;
344                    }
345                }
346                _ if metadata.status.is_terminal() => {
347                    self.insert_rehydrated_task(metadata.clone(), paths, true)?;
348                    self.enqueue_completion_if_needed(&metadata, false);
349                }
350                _ => {}
351            }
352        }
353
354        Ok(())
355    }
356
357    pub fn status(
358        &self,
359        task_id: &str,
360        session_id: &str,
361        preview_bytes: usize,
362    ) -> Option<BgTaskSnapshot> {
363        let task = self.task_for_session(task_id, session_id)?;
364        let _ = self.poll_task(&task);
365        Some(task.snapshot(preview_bytes))
366    }
367
368    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
369        let tasks = self
370            .inner
371            .tasks
372            .lock()
373            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
374            .unwrap_or_default();
375        tasks
376            .into_iter()
377            .map(|task| {
378                let _ = self.poll_task(&task);
379                task.snapshot(preview_bytes)
380            })
381            .collect()
382    }
383
384    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
385        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
386    }
387
388    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
389        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
390            .map(|_| ())
391    }
392
393    pub fn cleanup_finished(&self, older_than: Duration) {
394        let cutoff = Instant::now().checked_sub(older_than);
395        if let Ok(mut tasks) = self.inner.tasks.lock() {
396            tasks.retain(|_, task| {
397                let is_terminal = task
398                    .state
399                    .lock()
400                    .map(|state| state.metadata.status.is_terminal())
401                    .unwrap_or(false);
402                if !is_terminal {
403                    return true;
404                }
405
406                let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
407                match (terminal_at, cutoff) {
408                    (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
409                    (Some(_), None) => false,
410                    (None, _) => true,
411                }
412            });
413        }
414    }
415
416    pub fn drain_completions(&self) -> Vec<BgCompletion> {
417        self.drain_completions_for_session(None)
418    }
419
420    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
421        let mut completions = match self.inner.completions.lock() {
422            Ok(completions) => completions,
423            Err(_) => return Vec::new(),
424        };
425
426        let drained = if let Some(session_id) = session_id {
427            let mut matched = Vec::new();
428            let mut retained = VecDeque::new();
429            while let Some(completion) = completions.pop_front() {
430                if completion.session_id == session_id {
431                    matched.push(completion);
432                } else {
433                    retained.push_back(completion);
434                }
435            }
436            *completions = retained;
437            matched
438        } else {
439            completions.drain(..).collect()
440        };
441        drop(completions);
442
443        for completion in &drained {
444            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
445                let _ = task.set_completion_delivered(true);
446            }
447        }
448
449        drained
450    }
451
452    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
453        self.inner
454            .completions
455            .lock()
456            .map(|completions| {
457                completions
458                    .iter()
459                    .filter(|completion| completion.session_id == session_id)
460                    .cloned()
461                    .collect()
462            })
463            .unwrap_or_default()
464    }
465
466    pub fn detach(&self) {
467        self.inner.shutdown.store(true, Ordering::SeqCst);
468        if let Ok(mut tasks) = self.inner.tasks.lock() {
469            for task in tasks.values() {
470                if let Ok(mut state) = task.state.lock() {
471                    state.child = None;
472                    state.detached = true;
473                }
474            }
475            tasks.clear();
476        }
477    }
478
479    pub fn shutdown(&self) {
480        let tasks = self
481            .inner
482            .tasks
483            .lock()
484            .map(|tasks| {
485                tasks
486                    .values()
487                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
488                    .collect::<Vec<_>>()
489            })
490            .unwrap_or_default();
491        for (task_id, session_id) in tasks {
492            let _ = self.kill(&task_id, &session_id);
493        }
494    }
495
496    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
497        let marker = match read_exit_marker(&task.paths.exit) {
498            Ok(Some(marker)) => marker,
499            Ok(None) => return Ok(()),
500            Err(error) => return Err(format!("failed to read exit marker: {error}")),
501        };
502        self.finalize_from_marker(task, marker, None)
503    }
504
505    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
506        let Ok(mut state) = task.state.lock() else {
507            return;
508        };
509        if let Some(child) = state.child.as_mut() {
510            if matches!(child.try_wait(), Ok(Some(_))) {
511                // Child has exited. If the wrapper successfully wrote an
512                // exit marker, the next `poll_task()` cycle will pick it up
513                // and finalize via `finalize_from_marker`. But if the
514                // wrapper crashed before writing the marker (e.g. SIGKILL,
515                // power loss, wrapper bug), the task would forever appear
516                // Running until `timeout_ms` expired — and if no timeout
517                // was set, until the 24h `running_metadata_is_stale` cutoff
518                // hit at the next aft restart. Same condition as the replay
519                // path's "PID dead but no marker" branch (see line 338).
520                //
521                // To avoid that hidden hang, mark the task Failed
522                // immediately with the same reason string used by replay,
523                // but only if the marker is genuinely absent. If a marker
524                // appeared on disk between try_wait() returning and now
525                // (race window), prefer the marker — let the next poll
526                // cycle finalize from it.
527                state.child = None;
528                state.detached = true;
529                if state.metadata.status.is_terminal() {
530                    return;
531                }
532                if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
533                    return;
534                }
535                let updated = update_task(&task.paths.json, |metadata| {
536                    metadata.mark_terminal(
537                        BgTaskStatus::Failed,
538                        None,
539                        Some("process exited without exit marker".to_string()),
540                    );
541                });
542                if let Ok(metadata) = updated {
543                    state.metadata = metadata;
544                    task.mark_terminal_now();
545                    state.buffer.enforce_terminal_cap();
546                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
547                }
548            }
549        }
550    }
551
552    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
553        self.inner
554            .tasks
555            .lock()
556            .map(|tasks| {
557                tasks
558                    .values()
559                    .filter(|task| task.is_running())
560                    .cloned()
561                    .collect()
562            })
563            .unwrap_or_default()
564    }
565
566    fn insert_rehydrated_task(
567        &self,
568        metadata: PersistedTask,
569        paths: TaskPaths,
570        detached: bool,
571    ) -> Result<(), String> {
572        let task_id = metadata.task_id.clone();
573        let session_id = metadata.session_id.clone();
574        let task = Arc::new(BgTask {
575            task_id: task_id.clone(),
576            session_id,
577            paths: paths.clone(),
578            started: Instant::now(),
579            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
580            state: Mutex::new(BgTaskState {
581                metadata,
582                child: None,
583                detached,
584                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
585            }),
586        });
587        self.inner
588            .tasks
589            .lock()
590            .map_err(|_| "background task registry lock poisoned".to_string())?
591            .insert(task_id, task);
592        Ok(())
593    }
594
595    fn kill_with_status(
596        &self,
597        task_id: &str,
598        session_id: &str,
599        terminal_status: BgTaskStatus,
600    ) -> Result<BgTaskSnapshot, String> {
601        let task = self
602            .task_for_session(task_id, session_id)
603            .ok_or_else(|| format!("background task not found: {task_id}"))?;
604
605        {
606            let mut state = task
607                .state
608                .lock()
609                .map_err(|_| "background task lock poisoned".to_string())?;
610            if state.metadata.status.is_terminal() {
611                return Ok(task.snapshot_locked(&state, 5 * 1024));
612            }
613
614            state.metadata.status = BgTaskStatus::Killing;
615            write_task(&task.paths.json, &state.metadata)
616                .map_err(|e| format!("failed to persist killing state: {e}"))?;
617
618            #[cfg(unix)]
619            if let Some(pgid) = state.metadata.pgid {
620                terminate_pgid(pgid, state.child.as_mut());
621            }
622            #[cfg(windows)]
623            if let Some(child) = state.child.as_mut() {
624                super::process::terminate_process(child);
625            } else if let Some(pid) = state.metadata.child_pid {
626                terminate_pid(pid);
627            }
628            if let Some(child) = state.child.as_mut() {
629                let _ = child.wait();
630            }
631            state.child = None;
632            state.detached = true;
633
634            if !task.paths.exit.exists() {
635                write_kill_marker_if_absent(&task.paths.exit)
636                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
637            }
638
639            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
640                Some(124)
641            } else {
642                None
643            };
644            state
645                .metadata
646                .mark_terminal(terminal_status, exit_code, None);
647            task.mark_terminal_now();
648            write_task(&task.paths.json, &state.metadata)
649                .map_err(|e| format!("failed to persist killed state: {e}"))?;
650            state.buffer.enforce_terminal_cap();
651            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
652        }
653
654        Ok(task.snapshot(5 * 1024))
655    }
656
657    fn finalize_from_marker(
658        &self,
659        task: &Arc<BgTask>,
660        marker: ExitMarker,
661        reason: Option<String>,
662    ) -> Result<(), String> {
663        let mut state = task
664            .state
665            .lock()
666            .map_err(|_| "background task lock poisoned".to_string())?;
667        if state.metadata.status.is_terminal() {
668            return Ok(());
669        }
670
671        let updated = update_task(&task.paths.json, |metadata| {
672            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
673            *metadata = new_metadata;
674        })
675        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
676        state.metadata = updated;
677        task.mark_terminal_now();
678        state.child = None;
679        state.detached = true;
680        state.buffer.enforce_terminal_cap();
681        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
682        Ok(())
683    }
684
685    fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
686        if metadata.status.is_terminal() && !metadata.completion_delivered {
687            self.enqueue_completion_locked(metadata, None, emit_frame);
688        }
689    }
690
691    fn enqueue_completion_locked(
692        &self,
693        metadata: &PersistedTask,
694        buffer: Option<&BgBuffer>,
695        emit_frame: bool,
696    ) {
697        if !metadata.status.is_terminal() || metadata.completion_delivered {
698            return;
699        }
700        // Read tail once at completion time and cache on the BgCompletion so
701        // both the push-frame consumer (running session) and any later
702        // `bash_drain_completions` poll (different session, restart) see the
703        // same preview without racing against rotation.
704        let (output_preview, output_truncated) = match buffer {
705            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
706            None => (String::new(), false),
707        };
708        let completion = BgCompletion {
709            task_id: metadata.task_id.clone(),
710            session_id: metadata.session_id.clone(),
711            status: metadata.status.clone(),
712            exit_code: metadata.exit_code,
713            command: metadata.command.clone(),
714            output_preview,
715            output_truncated,
716        };
717        if let Ok(mut completions) = self.inner.completions.lock() {
718            if completions
719                .iter()
720                .any(|completion| completion.task_id == metadata.task_id)
721            {
722                return;
723            }
724            completions.push_back(completion.clone());
725        } else {
726            return;
727        }
728
729        if emit_frame {
730            self.emit_bash_completed(completion);
731        }
732    }
733
734    fn emit_bash_completed(&self, completion: BgCompletion) {
735        let Ok(progress_sender) = self
736            .inner
737            .progress_sender
738            .lock()
739            .map(|sender| sender.clone())
740        else {
741            return;
742        };
743        let Some(sender) = progress_sender.as_ref() else {
744            return;
745        };
746        // Clone the callback out of the registry mutex before writing to stdout;
747        // otherwise a blocked push-frame write could pin the mutex and starve
748        // unrelated progress-sender updates.
749        // Bg task transitions are discovered by the watchdog thread, so the
750        // sender is shared behind a Mutex. It still uses the same stdout writer
751        // closure as foreground progress frames, preserving the existing lock/
752        // flush behavior in main.rs.
753        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
754            completion.task_id,
755            completion.session_id,
756            completion.status,
757            completion.exit_code,
758            completion.command,
759            completion.output_preview,
760            completion.output_truncated,
761        )));
762    }
763
764    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
765        self.inner
766            .tasks
767            .lock()
768            .ok()
769            .and_then(|tasks| tasks.get(task_id).cloned())
770    }
771
772    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
773        self.task(task_id)
774            .filter(|task| task.session_id == session_id)
775    }
776
777    fn running_count(&self) -> usize {
778        self.inner
779            .tasks
780            .lock()
781            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
782            .unwrap_or(0)
783    }
784
785    fn start_watchdog(&self) {
786        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
787            super::watchdog::start(self.clone());
788        }
789    }
790
791    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
792        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
793    }
794
795    #[cfg(test)]
796    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
797        self.task_for_session(task_id, session_id)
798            .map(|task| task.paths.json.clone())
799    }
800
801    #[cfg(test)]
802    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
803        self.task_for_session(task_id, session_id)
804            .map(|task| task.paths.exit.clone())
805    }
806
807    /// Generate a `bgb-{16hex}` slug that is unique against live tasks and queued completions.
808    fn generate_unique_task_id(&self) -> Result<String, String> {
809        for _ in 0..32 {
810            let candidate = random_slug();
811            let tasks = self
812                .inner
813                .tasks
814                .lock()
815                .map_err(|_| "background task registry lock poisoned".to_string())?;
816            if tasks.contains_key(&candidate) {
817                continue;
818            }
819            let completions = self
820                .inner
821                .completions
822                .lock()
823                .map_err(|_| "background completions lock poisoned".to_string())?;
824            if completions
825                .iter()
826                .any(|completion| completion.task_id == candidate)
827            {
828                continue;
829            }
830            return Ok(candidate);
831        }
832        Err("failed to allocate unique background task id after 32 attempts".to_string())
833    }
834}
835
836impl Default for BgTaskRegistry {
837    fn default() -> Self {
838        Self::new(Arc::new(Mutex::new(None)))
839    }
840}
841
842impl BgTask {
843    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
844        let state = self
845            .state
846            .lock()
847            .unwrap_or_else(|poison| poison.into_inner());
848        self.snapshot_locked(&state, preview_bytes)
849    }
850
851    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
852        let metadata = &state.metadata;
853        let duration_ms = metadata.duration_ms.or_else(|| {
854            metadata
855                .status
856                .is_terminal()
857                .then(|| self.started.elapsed().as_millis() as u64)
858        });
859        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
860        BgTaskSnapshot {
861            info: BgTaskInfo {
862                task_id: self.task_id.clone(),
863                status: metadata.status.clone(),
864                command: metadata.command.clone(),
865                started_at: metadata.started_at,
866                duration_ms,
867            },
868            exit_code: metadata.exit_code,
869            child_pid: metadata.child_pid,
870            workdir: metadata.workdir.display().to_string(),
871            output_preview,
872            output_truncated,
873            output_path: state
874                .buffer
875                .output_path()
876                .map(|path| path.display().to_string()),
877            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
878        }
879    }
880
881    pub(crate) fn is_running(&self) -> bool {
882        self.state
883            .lock()
884            .map(|state| state.metadata.status == BgTaskStatus::Running)
885            .unwrap_or(false)
886    }
887
888    fn mark_terminal_now(&self) {
889        if let Ok(mut terminal_at) = self.terminal_at.lock() {
890            if terminal_at.is_none() {
891                *terminal_at = Some(Instant::now());
892            }
893        }
894    }
895
896    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
897        let mut state = self
898            .state
899            .lock()
900            .map_err(|_| "background task lock poisoned".to_string())?;
901        let updated = update_task(&self.paths.json, |metadata| {
902            metadata.completion_delivered = delivered;
903        })
904        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
905        state.metadata = updated;
906        Ok(())
907    }
908}
909
910fn terminal_metadata_from_marker(
911    mut metadata: PersistedTask,
912    marker: ExitMarker,
913    reason: Option<String>,
914) -> PersistedTask {
915    match marker {
916        ExitMarker::Code(code) => {
917            let status = if code == 0 {
918                BgTaskStatus::Completed
919            } else {
920                BgTaskStatus::Failed
921            };
922            metadata.mark_terminal(status, Some(code), reason);
923        }
924        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
925    }
926    metadata
927}
928
929#[cfg(unix)]
930fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
931    let mut cmd = Command::new("/bin/sh");
932    cmd.arg("-c")
933        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
934        .arg("/bin/sh")
935        .arg(command)
936        .arg(exit_path);
937    unsafe {
938        cmd.pre_exec(|| {
939            if libc::setsid() == -1 {
940                return Err(std::io::Error::last_os_error());
941            }
942            Ok(())
943        });
944    }
945    cmd
946}
947
948#[cfg(windows)]
949fn detached_shell_command_for(
950    shell: crate::windows_shell::WindowsShell,
951    command: &str,
952    exit_path: &Path,
953    paths: &TaskPaths,
954    creation_flags: u32,
955) -> Result<Command, String> {
956    use crate::windows_shell::WindowsShell;
957    // Write the wrapper to a temp file alongside the other task files,
958    // then invoke the shell with the file path as a single clean
959    // argument. This sidesteps the entire Windows command-line quoting
960    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
961    // parser all interacting with embedded quotes in the wrapper).
962    //
963    // Path arguments don't need quoting in the same problematic way
964    // because: (1) we use no-space task IDs (bgb-XXXXXXXX) so the path
965    // contains no characters that need shell escaping; (2) the wrapper
966    // body's internal quotes never reach the shell command line — the
967    // shell reads them from disk by file syntax rules, not command-line
968    // parser rules.
969    let wrapper_body = shell.wrapper_script(command, exit_path);
970    let wrapper_ext = match shell {
971        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
972        WindowsShell::Cmd => "bat",
973    };
974    let wrapper_path = paths.dir.join(format!(
975        "{}.{}",
976        paths
977            .json
978            .file_stem()
979            .and_then(|s| s.to_str())
980            .unwrap_or("wrapper"),
981        wrapper_ext
982    ));
983    fs::write(&wrapper_path, wrapper_body)
984        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
985
986    let mut cmd = Command::new(shell.binary());
987    match shell {
988        WindowsShell::Pwsh | WindowsShell::Powershell => {
989            // -File runs the script with no quoting issues. `-NoLogo`,
990            // `-NoProfile`, etc. apply to the host before the file runs.
991            cmd.args([
992                "-NoLogo",
993                "-NoProfile",
994                "-NonInteractive",
995                "-ExecutionPolicy",
996                "Bypass",
997                "-File",
998            ]);
999            cmd.arg(&wrapper_path);
1000        }
1001        WindowsShell::Cmd => {
1002            // `cmd /V:ON /D /C "<bat-file-path>"` — invoking a .bat
1003            // file via /C is well-defined; the file's contents are
1004            // read line-by-line by cmd's batch processor, NOT
1005            // re-interpreted by the /C parser. This avoids the
1006            // "filename syntax incorrect" errors that came from
1007            // having complex compound commands on the cmd line.
1008            cmd.args(["/V:ON", "/D", "/C"]);
1009            cmd.arg(&wrapper_path);
1010        }
1011    }
1012
1013    // Win32 process creation flags. Caller selects whether to include
1014    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
1015    // for the breakaway-fallback strategy.
1016    cmd.creation_flags(creation_flags);
1017    Ok(cmd)
1018}
1019
1020/// Spawn a detached background bash child process.
1021///
1022/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1023/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1024/// cmd.exe) and retries with the next candidate when the previous one
1025/// fails to spawn with `NotFound` — the same runtime safety net the
1026/// foreground bash path has, so issue #27 callers landing on cmd.exe
1027/// fallback can also use background bash. The wrapper script is
1028/// regenerated per attempt because PowerShell wrappers embed the shell
1029/// binary by name; the stdout/stderr capture handles are also reopened
1030/// per attempt because `Command::spawn()` consumes them.
1031///
1032/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1033/// return immediately without retry — they indicate a problem with the
1034/// resolved shell that retrying with a different shell won't fix.
1035fn spawn_detached_child(
1036    command: &str,
1037    paths: &TaskPaths,
1038    workdir: &Path,
1039    env: &HashMap<String, String>,
1040) -> Result<std::process::Child, String> {
1041    #[cfg(not(windows))]
1042    {
1043        let stdout = create_capture_file(&paths.stdout)
1044            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1045        let stderr = create_capture_file(&paths.stderr)
1046            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1047        detached_shell_command(command, &paths.exit)
1048            .current_dir(workdir)
1049            .envs(env)
1050            .stdin(Stdio::null())
1051            .stdout(Stdio::from(stdout))
1052            .stderr(Stdio::from(stderr))
1053            .spawn()
1054            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1055    }
1056    #[cfg(windows)]
1057    {
1058        use crate::windows_shell::{shell_candidates, WindowsShell};
1059        // For background bash, prefer cmd.exe over PowerShell because the
1060        // PowerShell wrapper writes captured output through the detached-
1061        // process stdout/stderr handles inconsistently on some Windows
1062        // configurations (observed live: empty stdout/stderr even when the
1063        // wrapper script clearly executes). cmd.exe's batch wrapper is a
1064        // simpler execution model with reliable detached-handle inheritance,
1065        // and `!ERRORLEVEL!` correctly captures the user command's real
1066        // exit code (Oracle review P1-1 fix).
1067        //
1068        // Foreground bash still walks the regular pwsh→powershell→cmd
1069        // priority order because foreground stdout/stderr are inherited
1070        // pipes that PowerShell handles correctly.
1071        //
1072        // If cmd.exe is unavailable for any reason (extremely unusual on
1073        // Windows but possible under aggressive lockdown), we fall through
1074        // to the PowerShell variants in priority order — better to attempt
1075        // a partially-working PS wrapper than fail outright.
1076        let raw_candidates = shell_candidates();
1077        let mut candidates: Vec<WindowsShell> = Vec::with_capacity(raw_candidates.len());
1078        for shell in &raw_candidates {
1079            if *shell == WindowsShell::Cmd {
1080                candidates.insert(0, *shell);
1081            } else {
1082                candidates.push(*shell);
1083            }
1084        }
1085        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
1086        // first (so the bg child outlives the AFT process when AFT is killed),
1087        // then fall back without it for environments where the parent is in a
1088        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
1089        // runners (GitHub Actions windows-2022) and some MDM-managed corp
1090        // environments hit this — `CreateProcess` returns Access Denied (5).
1091        // Without breakaway, the child still runs detached but will be torn
1092        // down with the parent if the parent process group is signaled.
1093        const FLAG_DETACHED_PROCESS: u32 = 0x0000_0008;
1094        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1095        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1096        let with_breakaway =
1097            FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1098        let without_breakaway = FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP;
1099        let mut last_error: Option<String> = None;
1100        for (idx, shell) in candidates.iter().enumerate() {
1101            // Per-shell, try with breakaway first. If the process is in a
1102            // restrictive job, the breakaway flag triggers Access Denied
1103            // (os error 5). Retry once without breakaway.
1104            for &flags in &[with_breakaway, without_breakaway] {
1105                // Re-open capture handles per attempt; spawn() consumes them.
1106                let stdout = create_capture_file(&paths.stdout)
1107                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1108                let stderr = create_capture_file(&paths.stderr)
1109                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1110                let mut cmd =
1111                    detached_shell_command_for(*shell, command, &paths.exit, paths, flags)?;
1112                cmd.current_dir(workdir)
1113                    .envs(env)
1114                    .stdin(Stdio::null())
1115                    .stdout(Stdio::from(stdout))
1116                    .stderr(Stdio::from(stderr));
1117                match cmd.spawn() {
1118                    Ok(child) => {
1119                        if idx > 0 {
1120                            log::warn!(
1121                                "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1122                                 the cached PATH probe disagreed with runtime spawn — likely PATH \
1123                                 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1124                                shell.binary(),
1125                                idx
1126                            );
1127                        }
1128                        if flags == without_breakaway {
1129                            log::warn!(
1130                                "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1131                                 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1132                                 Spawned without breakaway; the bg task will be torn down if the \
1133                                 AFT process group is killed."
1134                            );
1135                        }
1136                        return Ok(child);
1137                    }
1138                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1139                        log::warn!(
1140                            "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1141                            shell.binary()
1142                        );
1143                        last_error = Some(format!("{}: {e}", shell.binary()));
1144                        // Skip the without-breakaway retry for NotFound — the
1145                        // binary itself is missing, breakaway flag is irrelevant.
1146                        break;
1147                    }
1148                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1149                        // Access Denied during breakaway — retry without it.
1150                        log::warn!(
1151                            "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1152                             Access Denied — retrying {} without breakaway",
1153                            shell.binary()
1154                        );
1155                        last_error = Some(format!("{}: {e}", shell.binary()));
1156                        continue;
1157                    }
1158                    Err(e) => {
1159                        return Err(format!(
1160                            "failed to spawn background bash command via {}: {e}",
1161                            shell.binary()
1162                        ));
1163                    }
1164                }
1165            }
1166        }
1167        Err(format!(
1168            "failed to spawn background bash command: no Windows shell could be spawned. \
1169             Last error: {}. PATH-probed candidates: {:?}",
1170            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1171            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1172        ))
1173    }
1174}
1175
1176fn random_slug() -> String {
1177    let mut bytes = [0u8; 4];
1178    // getrandom is a transitive dependency; use it directly for OS entropy.
1179    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1180        // Extremely unlikely fallback: time + pid mix.
1181        let t = SystemTime::now()
1182            .duration_since(UNIX_EPOCH)
1183            .map(|d| d.subsec_nanos())
1184            .unwrap_or(0);
1185        let p = std::process::id();
1186        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1187    });
1188    // `bgb-` + 8 lowercase hex chars — compact, OS-entropy backed.
1189    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1190    format!("bgb-{hex}")
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195    use std::collections::HashMap;
1196    #[cfg(windows)]
1197    use std::fs;
1198    use std::sync::{Arc, Mutex};
1199    use std::time::Duration;
1200    #[cfg(windows)]
1201    use std::time::Instant;
1202
1203    use super::*;
1204
1205    #[cfg(unix)]
1206    const QUICK_SUCCESS_COMMAND: &str = "true";
1207    #[cfg(windows)]
1208    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1209
1210    #[cfg(unix)]
1211    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1212    #[cfg(windows)]
1213    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1214
1215    #[test]
1216    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1217        let registry = BgTaskRegistry::default();
1218        let dir = tempfile::tempdir().unwrap();
1219        let task_id = registry
1220            .spawn(
1221                QUICK_SUCCESS_COMMAND,
1222                "session".to_string(),
1223                dir.path().to_path_buf(),
1224                HashMap::new(),
1225                Some(Duration::from_secs(30)),
1226                dir.path().to_path_buf(),
1227                10,
1228            )
1229            .unwrap();
1230        registry
1231            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1232            .unwrap();
1233
1234        registry.cleanup_finished(Duration::ZERO);
1235
1236        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1237    }
1238
1239    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1240    /// watchdog path (reap_child) marks a task Failed when the child
1241    /// has exited but no exit marker was written. Before this fix the
1242    /// task would remain `Running` until timeout, even though the
1243    /// process was definitely dead.
1244    ///
1245    /// Cross-platform: uses a quick-exiting command that does NOT go
1246    /// through the wrapper script (we manually clear the exit marker
1247    /// after spawn to simulate the wrapper crashing before write).
1248    #[test]
1249    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1250        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1251        let dir = tempfile::tempdir().unwrap();
1252        let task_id = registry
1253            .spawn(
1254                QUICK_SUCCESS_COMMAND,
1255                "session".to_string(),
1256                dir.path().to_path_buf(),
1257                HashMap::new(),
1258                Some(Duration::from_secs(30)),
1259                dir.path().to_path_buf(),
1260                10,
1261            )
1262            .unwrap();
1263
1264        let task = registry.task_for_session(&task_id, "session").unwrap();
1265
1266        // Wait for the child to actually exit and the wrapper to either
1267        // write the marker or fail. Then nuke the marker to simulate
1268        // wrapper crash before write. Poll up to 5s; this is plenty for a
1269        // `true`/`cmd /c exit 0` invocation.
1270        let started = Instant::now();
1271        loop {
1272            let exited = {
1273                let mut state = task.state.lock().unwrap();
1274                if let Some(child) = state.child.as_mut() {
1275                    matches!(child.try_wait(), Ok(Some(_)))
1276                } else {
1277                    true
1278                }
1279            };
1280            if exited {
1281                break;
1282            }
1283            assert!(
1284                started.elapsed() < Duration::from_secs(5),
1285                "child should exit quickly"
1286            );
1287            std::thread::sleep(Duration::from_millis(20));
1288        }
1289
1290        // Wrapper likely wrote the marker by now; remove it to simulate
1291        // a wrapper crash that exited before persisting the exit code.
1292        let _ = std::fs::remove_file(&task.paths.exit);
1293
1294        // Sanity: task is still Running per metadata (replay/poll hasn't
1295        // observed the missing marker yet).
1296        assert!(
1297            task.is_running(),
1298            "precondition: metadata.status == Running"
1299        );
1300        assert!(
1301            !task.paths.exit.exists(),
1302            "precondition: exit marker absent"
1303        );
1304
1305        // Invoke the watchdog's reap_child directly. The fix should mark
1306        // the task Failed with the documented reason string, instead of
1307        // just dropping the child handle and leaving status=Running.
1308        registry.reap_child(&task);
1309
1310        let state = task.state.lock().unwrap();
1311        assert!(
1312            state.metadata.status.is_terminal(),
1313            "reap_child must transition to terminal when PID dead and no marker. \
1314             Got status={:?}",
1315            state.metadata.status
1316        );
1317        assert_eq!(
1318            state.metadata.status,
1319            BgTaskStatus::Failed,
1320            "must specifically be Failed (not Killed): status={:?}",
1321            state.metadata.status
1322        );
1323        assert_eq!(
1324            state.metadata.status_reason.as_deref(),
1325            Some("process exited without exit marker"),
1326            "reason must match replay path's wording: {:?}",
1327            state.metadata.status_reason
1328        );
1329        assert!(
1330            state.child.is_none(),
1331            "child handle must be released after reap"
1332        );
1333        assert!(state.detached, "task must be marked detached after reap");
1334    }
1335
1336    /// Companion to the above: when the exit marker DOES exist on disk
1337    /// at reap_child time (race window — wrapper finished writing
1338    /// between try_wait and the marker check), reap_child must NOT mark
1339    /// the task Failed. Instead it leaves status=Running and lets the
1340    /// next poll_task() cycle finalize via the marker.
1341    #[test]
1342    fn reap_child_preserves_running_when_exit_marker_exists() {
1343        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1344        let dir = tempfile::tempdir().unwrap();
1345        let task_id = registry
1346            .spawn(
1347                QUICK_SUCCESS_COMMAND,
1348                "session".to_string(),
1349                dir.path().to_path_buf(),
1350                HashMap::new(),
1351                Some(Duration::from_secs(30)),
1352                dir.path().to_path_buf(),
1353                10,
1354            )
1355            .unwrap();
1356
1357        let task = registry.task_for_session(&task_id, "session").unwrap();
1358
1359        // Wait for child to exit AND for the marker to land. Both happen
1360        // shortly after the wrapper finishes — but we want both observed.
1361        let started = Instant::now();
1362        loop {
1363            let exited = {
1364                let mut state = task.state.lock().unwrap();
1365                if let Some(child) = state.child.as_mut() {
1366                    matches!(child.try_wait(), Ok(Some(_)))
1367                } else {
1368                    true
1369                }
1370            };
1371            if exited && task.paths.exit.exists() {
1372                break;
1373            }
1374            assert!(
1375                started.elapsed() < Duration::from_secs(5),
1376                "child should exit and write marker quickly"
1377            );
1378            std::thread::sleep(Duration::from_millis(20));
1379        }
1380
1381        // reap_child sees: child exited, marker exists. It should:
1382        //  - drop state.child / set state.detached = true
1383        //  - NOT change status (poll_task will finalize via marker next tick)
1384        registry.reap_child(&task);
1385
1386        let state = task.state.lock().unwrap();
1387        assert!(
1388            state.child.is_none(),
1389            "child handle still released even when marker exists"
1390        );
1391        assert!(
1392            state.detached,
1393            "task still marked detached even when marker exists"
1394        );
1395        // Status remains Running because reap_child defers to poll_task
1396        // when a marker exists. It would be wrong for reap to record the
1397        // marker outcome (poll_task does that with proper exit-code
1398        // parsing).
1399        assert_eq!(
1400            state.metadata.status,
1401            BgTaskStatus::Running,
1402            "reap_child must defer to poll_task when marker exists"
1403        );
1404    }
1405
1406    #[test]
1407    fn cleanup_finished_keeps_running_tasks() {
1408        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1409        let dir = tempfile::tempdir().unwrap();
1410        let task_id = registry
1411            .spawn(
1412                LONG_RUNNING_COMMAND,
1413                "session".to_string(),
1414                dir.path().to_path_buf(),
1415                HashMap::new(),
1416                Some(Duration::from_secs(30)),
1417                dir.path().to_path_buf(),
1418                10,
1419            )
1420            .unwrap();
1421
1422        registry.cleanup_finished(Duration::ZERO);
1423
1424        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1425        let _ = registry.kill(&task_id, "session");
1426    }
1427
1428    #[cfg(windows)]
1429    fn wait_for_file(path: &Path) -> String {
1430        let started = Instant::now();
1431        loop {
1432            if path.exists() {
1433                return fs::read_to_string(path).expect("read file");
1434            }
1435            assert!(
1436                started.elapsed() < Duration::from_secs(30),
1437                "timed out waiting for {}",
1438                path.display()
1439            );
1440            std::thread::sleep(Duration::from_millis(100));
1441        }
1442    }
1443
1444    #[cfg(windows)]
1445    fn spawn_windows_registry_command(
1446        command: &str,
1447    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1448        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1449        let dir = tempfile::tempdir().unwrap();
1450        let task_id = registry
1451            .spawn(
1452                command,
1453                "session".to_string(),
1454                dir.path().to_path_buf(),
1455                HashMap::new(),
1456                Some(Duration::from_secs(30)),
1457                dir.path().to_path_buf(),
1458                10,
1459            )
1460            .unwrap();
1461        (registry, dir, task_id)
1462    }
1463
1464    #[cfg(windows)]
1465    #[test]
1466    fn windows_spawn_writes_exit_marker_for_zero_exit() {
1467        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1468        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1469
1470        let content = wait_for_file(&exit_path);
1471
1472        assert_eq!(content.trim(), "0");
1473    }
1474
1475    #[cfg(windows)]
1476    #[test]
1477    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1478        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1479        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1480
1481        let content = wait_for_file(&exit_path);
1482
1483        assert_eq!(content.trim(), "42");
1484    }
1485
1486    #[cfg(windows)]
1487    #[test]
1488    fn windows_spawn_captures_stdout_to_disk() {
1489        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1490        let task = registry.task_for_session(&task_id, "session").unwrap();
1491        let stdout_path = task.paths.stdout.clone();
1492        let exit_path = task.paths.exit.clone();
1493
1494        let _ = wait_for_file(&exit_path);
1495        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1496
1497        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1498    }
1499
1500    #[cfg(windows)]
1501    #[test]
1502    fn windows_spawn_uses_pwsh_when_available() {
1503        let shell = crate::windows_shell::resolve_windows_shell_with(|binary| {
1504            matches!(binary, "pwsh.exe" | "powershell.exe")
1505        });
1506
1507        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1508        assert_eq!(shell.binary(), "pwsh.exe");
1509    }
1510
1511    /// Issue #27 Oracle review P1: cmd wrapper MUST use `!ERRORLEVEL!` (not
1512    /// `%ERRORLEVEL%`) to capture the user command's run-time exit code.
1513    /// `%VAR%` is parse-time-expanded by cmd, so a wrapper using
1514    /// `%ERRORLEVEL%` would record a stale value (typically 0 from cmd's
1515    /// startup) regardless of what the user command returned. `!VAR!`
1516    /// requires delayed expansion, which `WindowsShell::bg_command` enables
1517    /// via `/V:ON`.
1518    #[cfg(windows)]
1519    #[test]
1520    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1521        let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1522        let script =
1523            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1524
1525        // MUST use !ERRORLEVEL! (delayed expansion), NOT %ERRORLEVEL%
1526        // (parse-time expansion). The latter would record a stale exit
1527        // code regardless of what the user command actually returned.
1528        assert!(
1529            script.contains("& echo !ERRORLEVEL! >"),
1530            "wrapper must use delayed expansion: {script}"
1531        );
1532        assert!(
1533            !script.contains("%ERRORLEVEL%"),
1534            "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1535        );
1536        assert!(script.contains("& move /Y"));
1537        // move output should be redirected to nul to avoid polluting the
1538        // user's captured stdout with "1 file(s) moved." lines.
1539        assert!(
1540            script.contains("> nul"),
1541            "wrapper must redirect move output to nul: {script}"
1542        );
1543        assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1544        assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1545    }
1546
1547    /// Issue #27 Oracle review P1: `bg_command()` for Cmd MUST prepend
1548    /// `/V:ON` to enable delayed expansion AND `/S` to use simple-quote
1549    /// parsing (so cmd's /C parser doesn't mangle the wrapper's internal
1550    /// quotes from `cmd_quote`).
1551    #[cfg(windows)]
1552    #[test]
1553    fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1554        use crate::windows_shell::WindowsShell;
1555        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1556        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1557        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1558        assert_eq!(
1559            args_strs,
1560            vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1561            "Cmd::bg_command must prepend /V:ON /D /S /C"
1562        );
1563    }
1564
1565    /// PowerShell variants don't need `/V:ON`-style flags; their args
1566    /// are the same for foreground (`command()`) and background
1567    /// (`bg_command()`).
1568    #[cfg(windows)]
1569    #[test]
1570    fn windows_shell_pwsh_bg_command_uses_standard_args() {
1571        use crate::windows_shell::WindowsShell;
1572        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1573        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1574        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1575        assert!(
1576            args_strs.contains(&"-Command"),
1577            "Pwsh::bg_command must use -Command: {args_strs:?}"
1578        );
1579        assert!(
1580            args_strs.contains(&"Get-Date"),
1581            "Pwsh::bg_command must include the user command body"
1582        );
1583    }
1584
1585    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
1586    /// **cmd.exe-specific** wrapper path captures the user command's
1587    /// run-time exit code correctly. The existing
1588    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
1589    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
1590    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
1591    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
1592    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
1593    ///
1594    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
1595    /// force the cmd-wrapper code path, then writes the exit marker and
1596    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
1597    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
1598    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
1599    /// regardless of what the user command returned.
1600    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
1601    /// the inline command-line wrapper helper that production code does
1602    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
1603    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
1604    /// produced silent failures on Windows 11 (move /Y could not parse
1605    /// path arguments through cmd's /C parser). The `bg_command` helper
1606    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
1607    /// the production spawn path goes through `detached_shell_command_for`
1608    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
1609    /// <bat-path>`.
1610    ///
1611    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
1612    /// covered live by the Windows e2e harness scenario 2d
1613    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
1614    /// exercises the real file-based wrapper end-to-end via the protocol.
1615    #[allow(dead_code)]
1616    #[cfg(any())] // disabled on all targets
1617    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1618}