Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21    create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22    update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30// Note: `resolve_windows_shell` is no longer imported at module scope —
31// production code in `spawn_detached_child` uses `shell_candidates()`
32// with retry instead, and the function remains in `windows_shell.rs`
33// for tests and as a future helper.
34
35/// Default timeout for background bash tasks: 30 minutes.
36/// Agents can override per-call via the `timeout` parameter (in ms).
37const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
41/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
42/// of typical command output (git status, test results, exit messages) — short
43/// enough that round-tripping multiple completions in one reminder stays well
44/// under the model's context budget but long enough that most successful runs
45/// don't need a follow-up `bash_status` call.
46const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50    pub task_id: String,
51    /// Intentionally omitted from serialized completion payloads: push frames
52    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
53    #[serde(skip_serializing)]
54    pub session_id: String,
55    pub status: BgTaskStatus,
56    pub exit_code: Option<i32>,
57    pub command: String,
58    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
59    /// cached so push-frame consumers and `bash_drain_completions` callers see
60    /// the same preview without racing against later output rotation. Empty
61    /// when not captured (e.g., persisted task seen on startup before buffer
62    /// reattachment).
63    #[serde(default, skip_serializing_if = "String::is_empty")]
64    pub output_preview: String,
65    /// True when the captured tail is shorter than the actual output (because
66    /// rotation occurred or the output exceeds the preview cap). Plugins use
67    /// this to render a `…` prefix and signal that `bash_status` would return
68    /// more.
69    #[serde(default, skip_serializing_if = "is_false")]
70    pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74    !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79    #[serde(flatten)]
80    pub info: BgTaskInfo,
81    pub exit_code: Option<i32>,
82    pub child_pid: Option<u32>,
83    pub workdir: String,
84    pub output_preview: String,
85    pub output_truncated: bool,
86    pub output_path: Option<String>,
87    pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92    pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98    pub(crate) progress_sender: SharedProgressSender,
99    watchdog_started: AtomicBool,
100    pub(crate) shutdown: AtomicBool,
101}
102
103pub(crate) struct BgTask {
104    pub(crate) task_id: String,
105    pub(crate) session_id: String,
106    pub(crate) paths: TaskPaths,
107    pub(crate) started: Instant,
108    pub(crate) terminal_at: Mutex<Option<Instant>>,
109    pub(crate) state: Mutex<BgTaskState>,
110}
111
112pub(crate) struct BgTaskState {
113    pub(crate) metadata: PersistedTask,
114    pub(crate) child: Option<Child>,
115    pub(crate) detached: bool,
116    pub(crate) buffer: BgBuffer,
117}
118
119impl BgTaskRegistry {
120    pub fn new(progress_sender: SharedProgressSender) -> Self {
121        Self {
122            inner: Arc::new(RegistryInner {
123                tasks: Mutex::new(HashMap::new()),
124                completions: Mutex::new(VecDeque::new()),
125                progress_sender,
126                watchdog_started: AtomicBool::new(false),
127                shutdown: AtomicBool::new(false),
128            }),
129        }
130    }
131
132    #[cfg(unix)]
133    pub fn spawn(
134        &self,
135        command: &str,
136        session_id: String,
137        workdir: PathBuf,
138        env: HashMap<String, String>,
139        timeout: Option<Duration>,
140        storage_dir: PathBuf,
141        max_running: usize,
142    ) -> Result<String, String> {
143        self.start_watchdog();
144
145        let running = self.running_count();
146        if running >= max_running {
147            return Err(format!(
148                "background bash task limit exceeded: {running} running (max {max_running})"
149            ));
150        }
151
152        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
153        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
154        let task_id = self.generate_unique_task_id()?;
155        let paths = task_paths(&storage_dir, &session_id, &task_id);
156        fs::create_dir_all(&paths.dir)
157            .map_err(|e| format!("failed to create background task dir: {e}"))?;
158
159        let mut metadata = PersistedTask::starting(
160            task_id.clone(),
161            session_id.clone(),
162            command.to_string(),
163            workdir.clone(),
164            timeout_ms,
165        );
166        write_task(&paths.json, &metadata)
167            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
168
169        // Pre-create capture files so the watchdog/buffer can always
170        // open them for reading. The spawn helper opens its own handles
171        // per attempt because each `Command::spawn()` consumes them.
172        create_capture_file(&paths.stdout)
173            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
174        create_capture_file(&paths.stderr)
175            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
176
177        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
178
179        let child_pid = child.id();
180        metadata.mark_running(child_pid, child_pid as i32);
181        write_task(&paths.json, &metadata)
182            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
183
184        let task = Arc::new(BgTask {
185            task_id: task_id.clone(),
186            session_id,
187            paths: paths.clone(),
188            started: Instant::now(),
189            terminal_at: Mutex::new(None),
190            state: Mutex::new(BgTaskState {
191                metadata,
192                child: Some(child),
193                detached: false,
194                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
195            }),
196        });
197
198        self.inner
199            .tasks
200            .lock()
201            .map_err(|_| "background task registry lock poisoned".to_string())?
202            .insert(task_id.clone(), task);
203
204        Ok(task_id)
205    }
206
207    #[cfg(windows)]
208    pub fn spawn(
209        &self,
210        command: &str,
211        session_id: String,
212        workdir: PathBuf,
213        env: HashMap<String, String>,
214        timeout: Option<Duration>,
215        storage_dir: PathBuf,
216        max_running: usize,
217    ) -> Result<String, String> {
218        self.start_watchdog();
219
220        let running = self.running_count();
221        if running >= max_running {
222            return Err(format!(
223                "background bash task limit exceeded: {running} running (max {max_running})"
224            ));
225        }
226
227        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
228        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
229        let task_id = self.generate_unique_task_id()?;
230        let paths = task_paths(&storage_dir, &session_id, &task_id);
231        fs::create_dir_all(&paths.dir)
232            .map_err(|e| format!("failed to create background task dir: {e}"))?;
233
234        let mut metadata = PersistedTask::starting(
235            task_id.clone(),
236            session_id.clone(),
237            command.to_string(),
238            workdir.clone(),
239            timeout_ms,
240        );
241        write_task(&paths.json, &metadata)
242            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
243
244        // Capture files are pre-created so the watchdog/buffer can always
245        // open them for reading even if the child hasn't written anything
246        // yet. The spawn helper opens its own handles per attempt because
247        // each `Command::spawn()` consumes them, and on Windows we may
248        // retry across multiple shell candidates if the first one fails.
249        create_capture_file(&paths.stdout)
250            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
251        create_capture_file(&paths.stderr)
252            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
253
254        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
255
256        let child_pid = child.id();
257        metadata.status = BgTaskStatus::Running;
258        metadata.child_pid = Some(child_pid);
259        metadata.pgid = None;
260        write_task(&paths.json, &metadata)
261            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
262
263        let task = Arc::new(BgTask {
264            task_id: task_id.clone(),
265            session_id,
266            paths: paths.clone(),
267            started: Instant::now(),
268            terminal_at: Mutex::new(None),
269            state: Mutex::new(BgTaskState {
270                metadata,
271                child: Some(child),
272                detached: false,
273                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
274            }),
275        });
276
277        self.inner
278            .tasks
279            .lock()
280            .map_err(|_| "background task registry lock poisoned".to_string())?
281            .insert(task_id.clone(), task);
282
283        Ok(task_id)
284    }
285
286    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
287        self.start_watchdog();
288        let dir = session_tasks_dir(storage_dir, session_id);
289        if !dir.exists() {
290            return Ok(());
291        }
292
293        let entries = fs::read_dir(&dir)
294            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
295        for entry in entries.flatten() {
296            let path = entry.path();
297            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
298                continue;
299            }
300            let Ok(mut metadata) = read_task(&path) else {
301                continue;
302            };
303            if metadata.session_id != session_id {
304                continue;
305            }
306
307            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
308            match metadata.status {
309                BgTaskStatus::Starting => {
310                    metadata.mark_terminal(
311                        BgTaskStatus::Failed,
312                        None,
313                        Some("spawn aborted".to_string()),
314                    );
315                    let _ = write_task(&paths.json, &metadata);
316                    self.enqueue_completion_if_needed(&metadata, false);
317                }
318                BgTaskStatus::Running => {
319                    if self.running_metadata_is_stale(&metadata) {
320                        metadata.mark_terminal(
321                            BgTaskStatus::Killed,
322                            None,
323                            Some("orphaned (>24h)".to_string()),
324                        );
325                        if !paths.exit.exists() {
326                            let _ = write_kill_marker_if_absent(&paths.exit);
327                        }
328                        let _ = write_task(&paths.json, &metadata);
329                        self.enqueue_completion_if_needed(&metadata, false);
330                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
331                        metadata = terminal_metadata_from_marker(metadata, marker, None);
332                        let _ = write_task(&paths.json, &metadata);
333                        self.enqueue_completion_if_needed(&metadata, false);
334                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
335                        metadata.mark_terminal(
336                            BgTaskStatus::Failed,
337                            None,
338                            Some("process exited without exit marker".to_string()),
339                        );
340                        let _ = write_task(&paths.json, &metadata);
341                        self.enqueue_completion_if_needed(&metadata, false);
342                    } else {
343                        self.insert_rehydrated_task(metadata, paths, true)?;
344                    }
345                }
346                _ if metadata.status.is_terminal() => {
347                    self.insert_rehydrated_task(metadata.clone(), paths, true)?;
348                    self.enqueue_completion_if_needed(&metadata, false);
349                }
350                _ => {}
351            }
352        }
353
354        Ok(())
355    }
356
357    pub fn status(
358        &self,
359        task_id: &str,
360        session_id: &str,
361        preview_bytes: usize,
362    ) -> Option<BgTaskSnapshot> {
363        let task = self.task_for_session(task_id, session_id)?;
364        let _ = self.poll_task(&task);
365        Some(task.snapshot(preview_bytes))
366    }
367
368    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
369        let tasks = self
370            .inner
371            .tasks
372            .lock()
373            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
374            .unwrap_or_default();
375        tasks
376            .into_iter()
377            .map(|task| {
378                let _ = self.poll_task(&task);
379                task.snapshot(preview_bytes)
380            })
381            .collect()
382    }
383
384    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
385        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
386    }
387
388    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
389        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
390            .map(|_| ())
391    }
392
393    pub fn cleanup_finished(&self, older_than: Duration) {
394        let cutoff = Instant::now().checked_sub(older_than);
395        if let Ok(mut tasks) = self.inner.tasks.lock() {
396            tasks.retain(|_, task| {
397                let is_terminal = task
398                    .state
399                    .lock()
400                    .map(|state| state.metadata.status.is_terminal())
401                    .unwrap_or(false);
402                if !is_terminal {
403                    return true;
404                }
405
406                let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
407                match (terminal_at, cutoff) {
408                    (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
409                    (Some(_), None) => false,
410                    (None, _) => true,
411                }
412            });
413        }
414    }
415
416    pub fn drain_completions(&self) -> Vec<BgCompletion> {
417        self.drain_completions_for_session(None)
418    }
419
420    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
421        let mut completions = match self.inner.completions.lock() {
422            Ok(completions) => completions,
423            Err(_) => return Vec::new(),
424        };
425
426        let drained = if let Some(session_id) = session_id {
427            let mut matched = Vec::new();
428            let mut retained = VecDeque::new();
429            while let Some(completion) = completions.pop_front() {
430                if completion.session_id == session_id {
431                    matched.push(completion);
432                } else {
433                    retained.push_back(completion);
434                }
435            }
436            *completions = retained;
437            matched
438        } else {
439            completions.drain(..).collect()
440        };
441        drop(completions);
442
443        for completion in &drained {
444            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
445                let _ = task.set_completion_delivered(true);
446            }
447        }
448
449        drained
450    }
451
452    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
453        self.inner
454            .completions
455            .lock()
456            .map(|completions| {
457                completions
458                    .iter()
459                    .filter(|completion| completion.session_id == session_id)
460                    .cloned()
461                    .collect()
462            })
463            .unwrap_or_default()
464    }
465
466    pub fn detach(&self) {
467        self.inner.shutdown.store(true, Ordering::SeqCst);
468        if let Ok(mut tasks) = self.inner.tasks.lock() {
469            for task in tasks.values() {
470                if let Ok(mut state) = task.state.lock() {
471                    state.child = None;
472                    state.detached = true;
473                }
474            }
475            tasks.clear();
476        }
477    }
478
479    pub fn shutdown(&self) {
480        let tasks = self
481            .inner
482            .tasks
483            .lock()
484            .map(|tasks| {
485                tasks
486                    .values()
487                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
488                    .collect::<Vec<_>>()
489            })
490            .unwrap_or_default();
491        for (task_id, session_id) in tasks {
492            let _ = self.kill(&task_id, &session_id);
493        }
494    }
495
496    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
497        let marker = match read_exit_marker(&task.paths.exit) {
498            Ok(Some(marker)) => marker,
499            Ok(None) => return Ok(()),
500            Err(error) => return Err(format!("failed to read exit marker: {error}")),
501        };
502        self.finalize_from_marker(task, marker, None)
503    }
504
505    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
506        let Ok(mut state) = task.state.lock() else {
507            return;
508        };
509        if let Some(child) = state.child.as_mut() {
510            if matches!(child.try_wait(), Ok(Some(_))) {
511                // Child has exited. If the wrapper successfully wrote an
512                // exit marker, the next `poll_task()` cycle will pick it up
513                // and finalize via `finalize_from_marker`. But if the
514                // wrapper crashed before writing the marker (e.g. SIGKILL,
515                // power loss, wrapper bug), the task would forever appear
516                // Running until `timeout_ms` expired — and if no timeout
517                // was set, until the 24h `running_metadata_is_stale` cutoff
518                // hit at the next aft restart. Same condition as the replay
519                // path's "PID dead but no marker" branch (see line 338).
520                //
521                // To avoid that hidden hang, mark the task Failed
522                // immediately with the same reason string used by replay,
523                // but only if the marker is genuinely absent. If a marker
524                // appeared on disk between try_wait() returning and now
525                // (race window), prefer the marker — let the next poll
526                // cycle finalize from it.
527                state.child = None;
528                state.detached = true;
529                if state.metadata.status.is_terminal() {
530                    return;
531                }
532                if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
533                    return;
534                }
535                let updated = update_task(&task.paths.json, |metadata| {
536                    metadata.mark_terminal(
537                        BgTaskStatus::Failed,
538                        None,
539                        Some("process exited without exit marker".to_string()),
540                    );
541                });
542                if let Ok(metadata) = updated {
543                    state.metadata = metadata;
544                    task.mark_terminal_now();
545                    state.buffer.enforce_terminal_cap();
546                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
547                }
548            }
549        }
550    }
551
552    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
553        self.inner
554            .tasks
555            .lock()
556            .map(|tasks| {
557                tasks
558                    .values()
559                    .filter(|task| task.is_running())
560                    .cloned()
561                    .collect()
562            })
563            .unwrap_or_default()
564    }
565
566    fn insert_rehydrated_task(
567        &self,
568        metadata: PersistedTask,
569        paths: TaskPaths,
570        detached: bool,
571    ) -> Result<(), String> {
572        let task_id = metadata.task_id.clone();
573        let session_id = metadata.session_id.clone();
574        let task = Arc::new(BgTask {
575            task_id: task_id.clone(),
576            session_id,
577            paths: paths.clone(),
578            started: Instant::now(),
579            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
580            state: Mutex::new(BgTaskState {
581                metadata,
582                child: None,
583                detached,
584                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
585            }),
586        });
587        self.inner
588            .tasks
589            .lock()
590            .map_err(|_| "background task registry lock poisoned".to_string())?
591            .insert(task_id, task);
592        Ok(())
593    }
594
595    fn kill_with_status(
596        &self,
597        task_id: &str,
598        session_id: &str,
599        terminal_status: BgTaskStatus,
600    ) -> Result<BgTaskSnapshot, String> {
601        let task = self
602            .task_for_session(task_id, session_id)
603            .ok_or_else(|| format!("background task not found: {task_id}"))?;
604
605        {
606            let mut state = task
607                .state
608                .lock()
609                .map_err(|_| "background task lock poisoned".to_string())?;
610            if state.metadata.status.is_terminal() {
611                return Ok(task.snapshot_locked(&state, 5 * 1024));
612            }
613
614            state.metadata.status = BgTaskStatus::Killing;
615            write_task(&task.paths.json, &state.metadata)
616                .map_err(|e| format!("failed to persist killing state: {e}"))?;
617
618            #[cfg(unix)]
619            if let Some(pgid) = state.metadata.pgid {
620                terminate_pgid(pgid, state.child.as_mut());
621            }
622            #[cfg(windows)]
623            if let Some(child) = state.child.as_mut() {
624                super::process::terminate_process(child);
625            } else if let Some(pid) = state.metadata.child_pid {
626                terminate_pid(pid);
627            }
628            if let Some(child) = state.child.as_mut() {
629                let _ = child.wait();
630            }
631            state.child = None;
632            state.detached = true;
633
634            if !task.paths.exit.exists() {
635                write_kill_marker_if_absent(&task.paths.exit)
636                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
637            }
638
639            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
640                Some(124)
641            } else {
642                None
643            };
644            state
645                .metadata
646                .mark_terminal(terminal_status, exit_code, None);
647            task.mark_terminal_now();
648            write_task(&task.paths.json, &state.metadata)
649                .map_err(|e| format!("failed to persist killed state: {e}"))?;
650            state.buffer.enforce_terminal_cap();
651            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
652        }
653
654        Ok(task.snapshot(5 * 1024))
655    }
656
657    fn finalize_from_marker(
658        &self,
659        task: &Arc<BgTask>,
660        marker: ExitMarker,
661        reason: Option<String>,
662    ) -> Result<(), String> {
663        let mut state = task
664            .state
665            .lock()
666            .map_err(|_| "background task lock poisoned".to_string())?;
667        if state.metadata.status.is_terminal() {
668            return Ok(());
669        }
670
671        let updated = update_task(&task.paths.json, |metadata| {
672            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
673            *metadata = new_metadata;
674        })
675        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
676        state.metadata = updated;
677        task.mark_terminal_now();
678        state.child = None;
679        state.detached = true;
680        state.buffer.enforce_terminal_cap();
681        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
682        Ok(())
683    }
684
685    fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
686        if metadata.status.is_terminal() && !metadata.completion_delivered {
687            self.enqueue_completion_locked(metadata, None, emit_frame);
688        }
689    }
690
691    fn enqueue_completion_locked(
692        &self,
693        metadata: &PersistedTask,
694        buffer: Option<&BgBuffer>,
695        emit_frame: bool,
696    ) {
697        if !metadata.status.is_terminal() || metadata.completion_delivered {
698            return;
699        }
700        // Read tail once at completion time and cache on the BgCompletion so
701        // both the push-frame consumer (running session) and any later
702        // `bash_drain_completions` poll (different session, restart) see the
703        // same preview without racing against rotation.
704        let (output_preview, output_truncated) = match buffer {
705            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
706            None => (String::new(), false),
707        };
708        let completion = BgCompletion {
709            task_id: metadata.task_id.clone(),
710            session_id: metadata.session_id.clone(),
711            status: metadata.status.clone(),
712            exit_code: metadata.exit_code,
713            command: metadata.command.clone(),
714            output_preview,
715            output_truncated,
716        };
717        if let Ok(mut completions) = self.inner.completions.lock() {
718            if completions
719                .iter()
720                .any(|completion| completion.task_id == metadata.task_id)
721            {
722                return;
723            }
724            completions.push_back(completion.clone());
725        } else {
726            return;
727        }
728
729        if emit_frame {
730            self.emit_bash_completed(completion);
731        }
732    }
733
734    fn emit_bash_completed(&self, completion: BgCompletion) {
735        let Ok(progress_sender) = self
736            .inner
737            .progress_sender
738            .lock()
739            .map(|sender| sender.clone())
740        else {
741            return;
742        };
743        let Some(sender) = progress_sender.as_ref() else {
744            return;
745        };
746        // Clone the callback out of the registry mutex before writing to stdout;
747        // otherwise a blocked push-frame write could pin the mutex and starve
748        // unrelated progress-sender updates.
749        // Bg task transitions are discovered by the watchdog thread, so the
750        // sender is shared behind a Mutex. It still uses the same stdout writer
751        // closure as foreground progress frames, preserving the existing lock/
752        // flush behavior in main.rs.
753        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
754            completion.task_id,
755            completion.session_id,
756            completion.status,
757            completion.exit_code,
758            completion.command,
759            completion.output_preview,
760            completion.output_truncated,
761        )));
762    }
763
764    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
765        self.inner
766            .tasks
767            .lock()
768            .ok()
769            .and_then(|tasks| tasks.get(task_id).cloned())
770    }
771
772    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
773        self.task(task_id)
774            .filter(|task| task.session_id == session_id)
775    }
776
777    fn running_count(&self) -> usize {
778        self.inner
779            .tasks
780            .lock()
781            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
782            .unwrap_or(0)
783    }
784
785    fn start_watchdog(&self) {
786        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
787            super::watchdog::start(self.clone());
788        }
789    }
790
791    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
792        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
793    }
794
795    #[cfg(test)]
796    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
797        self.task_for_session(task_id, session_id)
798            .map(|task| task.paths.json.clone())
799    }
800
801    #[cfg(test)]
802    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
803        self.task_for_session(task_id, session_id)
804            .map(|task| task.paths.exit.clone())
805    }
806
807    /// Generate a `bgb-{16hex}` slug that is unique against live tasks and queued completions.
808    fn generate_unique_task_id(&self) -> Result<String, String> {
809        for _ in 0..32 {
810            let candidate = random_slug();
811            let tasks = self
812                .inner
813                .tasks
814                .lock()
815                .map_err(|_| "background task registry lock poisoned".to_string())?;
816            if tasks.contains_key(&candidate) {
817                continue;
818            }
819            let completions = self
820                .inner
821                .completions
822                .lock()
823                .map_err(|_| "background completions lock poisoned".to_string())?;
824            if completions
825                .iter()
826                .any(|completion| completion.task_id == candidate)
827            {
828                continue;
829            }
830            return Ok(candidate);
831        }
832        Err("failed to allocate unique background task id after 32 attempts".to_string())
833    }
834}
835
836impl Default for BgTaskRegistry {
837    fn default() -> Self {
838        Self::new(Arc::new(Mutex::new(None)))
839    }
840}
841
842impl BgTask {
843    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
844        let state = self
845            .state
846            .lock()
847            .unwrap_or_else(|poison| poison.into_inner());
848        self.snapshot_locked(&state, preview_bytes)
849    }
850
851    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
852        let metadata = &state.metadata;
853        let duration_ms = metadata.duration_ms.or_else(|| {
854            metadata
855                .status
856                .is_terminal()
857                .then(|| self.started.elapsed().as_millis() as u64)
858        });
859        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
860        BgTaskSnapshot {
861            info: BgTaskInfo {
862                task_id: self.task_id.clone(),
863                status: metadata.status.clone(),
864                command: metadata.command.clone(),
865                started_at: metadata.started_at,
866                duration_ms,
867            },
868            exit_code: metadata.exit_code,
869            child_pid: metadata.child_pid,
870            workdir: metadata.workdir.display().to_string(),
871            output_preview,
872            output_truncated,
873            output_path: state
874                .buffer
875                .output_path()
876                .map(|path| path.display().to_string()),
877            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
878        }
879    }
880
881    pub(crate) fn is_running(&self) -> bool {
882        self.state
883            .lock()
884            .map(|state| state.metadata.status == BgTaskStatus::Running)
885            .unwrap_or(false)
886    }
887
888    fn mark_terminal_now(&self) {
889        if let Ok(mut terminal_at) = self.terminal_at.lock() {
890            if terminal_at.is_none() {
891                *terminal_at = Some(Instant::now());
892            }
893        }
894    }
895
896    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
897        let mut state = self
898            .state
899            .lock()
900            .map_err(|_| "background task lock poisoned".to_string())?;
901        let updated = update_task(&self.paths.json, |metadata| {
902            metadata.completion_delivered = delivered;
903        })
904        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
905        state.metadata = updated;
906        Ok(())
907    }
908}
909
910fn terminal_metadata_from_marker(
911    mut metadata: PersistedTask,
912    marker: ExitMarker,
913    reason: Option<String>,
914) -> PersistedTask {
915    match marker {
916        ExitMarker::Code(code) => {
917            let status = if code == 0 {
918                BgTaskStatus::Completed
919            } else {
920                BgTaskStatus::Failed
921            };
922            metadata.mark_terminal(status, Some(code), reason);
923        }
924        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
925    }
926    metadata
927}
928
929#[cfg(unix)]
930fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
931    let mut cmd = Command::new("/bin/sh");
932    cmd.arg("-c")
933        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
934        .arg("/bin/sh")
935        .arg(command)
936        .arg(exit_path);
937    unsafe {
938        cmd.pre_exec(|| {
939            if libc::setsid() == -1 {
940                return Err(std::io::Error::last_os_error());
941            }
942            Ok(())
943        });
944    }
945    cmd
946}
947
948#[cfg(windows)]
949fn detached_shell_command_for(
950    shell: crate::windows_shell::WindowsShell,
951    command: &str,
952    exit_path: &Path,
953    paths: &TaskPaths,
954) -> Result<Command, String> {
955    use crate::windows_shell::WindowsShell;
956    // Write the wrapper to a temp file alongside the other task files,
957    // then invoke the shell with the file path as a single clean
958    // argument. This sidesteps the entire Windows command-line quoting
959    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
960    // parser all interacting with embedded quotes in the wrapper).
961    //
962    // Path arguments don't need quoting in the same problematic way
963    // because: (1) we use no-space task IDs (bgb-XXXXXXXX) so the path
964    // contains no characters that need shell escaping; (2) the wrapper
965    // body's internal quotes never reach the shell command line — the
966    // shell reads them from disk by file syntax rules, not command-line
967    // parser rules.
968    let wrapper_body = shell.wrapper_script(command, exit_path);
969    let wrapper_ext = match shell {
970        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
971        WindowsShell::Cmd => "bat",
972    };
973    let wrapper_path = paths.dir.join(format!(
974        "{}.{}",
975        paths
976            .json
977            .file_stem()
978            .and_then(|s| s.to_str())
979            .unwrap_or("wrapper"),
980        wrapper_ext
981    ));
982    fs::write(&wrapper_path, wrapper_body)
983        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
984
985    let mut cmd = Command::new(shell.binary());
986    match shell {
987        WindowsShell::Pwsh | WindowsShell::Powershell => {
988            // -File runs the script with no quoting issues. `-NoLogo`,
989            // `-NoProfile`, etc. apply to the host before the file runs.
990            cmd.args([
991                "-NoLogo",
992                "-NoProfile",
993                "-NonInteractive",
994                "-ExecutionPolicy",
995                "Bypass",
996                "-File",
997            ]);
998            cmd.arg(&wrapper_path);
999        }
1000        WindowsShell::Cmd => {
1001            // `cmd /V:ON /D /C "<bat-file-path>"` — invoking a .bat
1002            // file via /C is well-defined; the file's contents are
1003            // read line-by-line by cmd's batch processor, NOT
1004            // re-interpreted by the /C parser. This avoids the
1005            // "filename syntax incorrect" errors that came from
1006            // having complex compound commands on the cmd line.
1007            cmd.args(["/V:ON", "/D", "/C"]);
1008            cmd.arg(&wrapper_path);
1009        }
1010    }
1011
1012    // Win32 process creation flags:
1013    // CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS | CREATE_BREAKAWAY_FROM_JOB
1014    // https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
1015    const DETACHED_BG_FLAGS: u32 = 0x0000_0200 | 0x0000_0008 | 0x0100_0000;
1016    cmd.creation_flags(DETACHED_BG_FLAGS);
1017    Ok(cmd)
1018}
1019
1020/// Spawn a detached background bash child process.
1021///
1022/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1023/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1024/// cmd.exe) and retries with the next candidate when the previous one
1025/// fails to spawn with `NotFound` — the same runtime safety net the
1026/// foreground bash path has, so issue #27 callers landing on cmd.exe
1027/// fallback can also use background bash. The wrapper script is
1028/// regenerated per attempt because PowerShell wrappers embed the shell
1029/// binary by name; the stdout/stderr capture handles are also reopened
1030/// per attempt because `Command::spawn()` consumes them.
1031///
1032/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1033/// return immediately without retry — they indicate a problem with the
1034/// resolved shell that retrying with a different shell won't fix.
1035fn spawn_detached_child(
1036    command: &str,
1037    paths: &TaskPaths,
1038    workdir: &Path,
1039    env: &HashMap<String, String>,
1040) -> Result<std::process::Child, String> {
1041    #[cfg(not(windows))]
1042    {
1043        let stdout = create_capture_file(&paths.stdout)
1044            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1045        let stderr = create_capture_file(&paths.stderr)
1046            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1047        detached_shell_command(command, &paths.exit)
1048            .current_dir(workdir)
1049            .envs(env)
1050            .stdin(Stdio::null())
1051            .stdout(Stdio::from(stdout))
1052            .stderr(Stdio::from(stderr))
1053            .spawn()
1054            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1055    }
1056    #[cfg(windows)]
1057    {
1058        use crate::windows_shell::{shell_candidates, WindowsShell};
1059        // For background bash, prefer cmd.exe over PowerShell because the
1060        // PowerShell wrapper writes captured output through the detached-
1061        // process stdout/stderr handles inconsistently on some Windows
1062        // configurations (observed live: empty stdout/stderr even when the
1063        // wrapper script clearly executes). cmd.exe's batch wrapper is a
1064        // simpler execution model with reliable detached-handle inheritance,
1065        // and `!ERRORLEVEL!` correctly captures the user command's real
1066        // exit code (Oracle review P1-1 fix).
1067        //
1068        // Foreground bash still walks the regular pwsh→powershell→cmd
1069        // priority order because foreground stdout/stderr are inherited
1070        // pipes that PowerShell handles correctly.
1071        //
1072        // If cmd.exe is unavailable for any reason (extremely unusual on
1073        // Windows but possible under aggressive lockdown), we fall through
1074        // to the PowerShell variants in priority order — better to attempt
1075        // a partially-working PS wrapper than fail outright.
1076        let raw_candidates = shell_candidates();
1077        let mut candidates: Vec<WindowsShell> = Vec::with_capacity(raw_candidates.len());
1078        for shell in &raw_candidates {
1079            if *shell == WindowsShell::Cmd {
1080                candidates.insert(0, *shell);
1081            } else {
1082                candidates.push(*shell);
1083            }
1084        }
1085        let mut last_error: Option<String> = None;
1086        for (idx, shell) in candidates.iter().enumerate() {
1087            // Re-open capture handles per attempt; spawn() consumes them.
1088            let stdout = create_capture_file(&paths.stdout)
1089                .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1090            let stderr = create_capture_file(&paths.stderr)
1091                .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1092            let mut cmd = detached_shell_command_for(*shell, command, &paths.exit, paths)?;
1093            cmd.current_dir(workdir)
1094                .envs(env)
1095                .stdin(Stdio::null())
1096                .stdout(Stdio::from(stdout))
1097                .stderr(Stdio::from(stderr));
1098            match cmd.spawn() {
1099                Ok(child) => {
1100                    if idx > 0 {
1101                        log::warn!(
1102                            "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1103                             the cached PATH probe disagreed with runtime spawn — likely PATH \
1104                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1105                            shell.binary(),
1106                            idx
1107                        );
1108                    }
1109                    return Ok(child);
1110                }
1111                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1112                    log::warn!(
1113                        "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1114                        shell.binary()
1115                    );
1116                    last_error = Some(format!("{}: {e}", shell.binary()));
1117                    continue;
1118                }
1119                Err(e) => {
1120                    return Err(format!(
1121                        "failed to spawn background bash command via {}: {e}",
1122                        shell.binary()
1123                    ));
1124                }
1125            }
1126        }
1127        Err(format!(
1128            "failed to spawn background bash command: no Windows shell could be spawned. \
1129             Last error: {}. PATH-probed candidates: {:?}",
1130            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1131            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1132        ))
1133    }
1134}
1135
1136fn random_slug() -> String {
1137    let mut bytes = [0u8; 4];
1138    // getrandom is a transitive dependency; use it directly for OS entropy.
1139    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1140        // Extremely unlikely fallback: time + pid mix.
1141        let t = SystemTime::now()
1142            .duration_since(UNIX_EPOCH)
1143            .map(|d| d.subsec_nanos())
1144            .unwrap_or(0);
1145        let p = std::process::id();
1146        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1147    });
1148    // `bgb-` + 8 lowercase hex chars — compact, OS-entropy backed.
1149    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1150    format!("bgb-{hex}")
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use std::collections::HashMap;
1156    #[cfg(windows)]
1157    use std::fs;
1158    use std::sync::{Arc, Mutex};
1159    use std::time::Duration;
1160    #[cfg(windows)]
1161    use std::time::Instant;
1162
1163    use super::*;
1164
1165    #[cfg(unix)]
1166    const QUICK_SUCCESS_COMMAND: &str = "true";
1167    #[cfg(windows)]
1168    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1169
1170    #[cfg(unix)]
1171    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1172    #[cfg(windows)]
1173    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1174
1175    #[test]
1176    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1177        let registry = BgTaskRegistry::default();
1178        let dir = tempfile::tempdir().unwrap();
1179        let task_id = registry
1180            .spawn(
1181                QUICK_SUCCESS_COMMAND,
1182                "session".to_string(),
1183                dir.path().to_path_buf(),
1184                HashMap::new(),
1185                Some(Duration::from_secs(30)),
1186                dir.path().to_path_buf(),
1187                10,
1188            )
1189            .unwrap();
1190        registry
1191            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1192            .unwrap();
1193
1194        registry.cleanup_finished(Duration::ZERO);
1195
1196        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1197    }
1198
1199    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1200    /// watchdog path (reap_child) marks a task Failed when the child
1201    /// has exited but no exit marker was written. Before this fix the
1202    /// task would remain `Running` until timeout, even though the
1203    /// process was definitely dead.
1204    ///
1205    /// Cross-platform: uses a quick-exiting command that does NOT go
1206    /// through the wrapper script (we manually clear the exit marker
1207    /// after spawn to simulate the wrapper crashing before write).
1208    #[test]
1209    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1210        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1211        let dir = tempfile::tempdir().unwrap();
1212        let task_id = registry
1213            .spawn(
1214                QUICK_SUCCESS_COMMAND,
1215                "session".to_string(),
1216                dir.path().to_path_buf(),
1217                HashMap::new(),
1218                Some(Duration::from_secs(30)),
1219                dir.path().to_path_buf(),
1220                10,
1221            )
1222            .unwrap();
1223
1224        let task = registry.task_for_session(&task_id, "session").unwrap();
1225
1226        // Wait for the child to actually exit and the wrapper to either
1227        // write the marker or fail. Then nuke the marker to simulate
1228        // wrapper crash before write. Poll up to 5s; this is plenty for a
1229        // `true`/`cmd /c exit 0` invocation.
1230        let started = Instant::now();
1231        loop {
1232            let exited = {
1233                let mut state = task.state.lock().unwrap();
1234                if let Some(child) = state.child.as_mut() {
1235                    matches!(child.try_wait(), Ok(Some(_)))
1236                } else {
1237                    true
1238                }
1239            };
1240            if exited {
1241                break;
1242            }
1243            assert!(
1244                started.elapsed() < Duration::from_secs(5),
1245                "child should exit quickly"
1246            );
1247            std::thread::sleep(Duration::from_millis(20));
1248        }
1249
1250        // Wrapper likely wrote the marker by now; remove it to simulate
1251        // a wrapper crash that exited before persisting the exit code.
1252        let _ = std::fs::remove_file(&task.paths.exit);
1253
1254        // Sanity: task is still Running per metadata (replay/poll hasn't
1255        // observed the missing marker yet).
1256        assert!(
1257            task.is_running(),
1258            "precondition: metadata.status == Running"
1259        );
1260        assert!(
1261            !task.paths.exit.exists(),
1262            "precondition: exit marker absent"
1263        );
1264
1265        // Invoke the watchdog's reap_child directly. The fix should mark
1266        // the task Failed with the documented reason string, instead of
1267        // just dropping the child handle and leaving status=Running.
1268        registry.reap_child(&task);
1269
1270        let state = task.state.lock().unwrap();
1271        assert!(
1272            state.metadata.status.is_terminal(),
1273            "reap_child must transition to terminal when PID dead and no marker. \
1274             Got status={:?}",
1275            state.metadata.status
1276        );
1277        assert_eq!(
1278            state.metadata.status,
1279            BgTaskStatus::Failed,
1280            "must specifically be Failed (not Killed): status={:?}",
1281            state.metadata.status
1282        );
1283        assert_eq!(
1284            state.metadata.status_reason.as_deref(),
1285            Some("process exited without exit marker"),
1286            "reason must match replay path's wording: {:?}",
1287            state.metadata.status_reason
1288        );
1289        assert!(
1290            state.child.is_none(),
1291            "child handle must be released after reap"
1292        );
1293        assert!(state.detached, "task must be marked detached after reap");
1294    }
1295
1296    /// Companion to the above: when the exit marker DOES exist on disk
1297    /// at reap_child time (race window — wrapper finished writing
1298    /// between try_wait and the marker check), reap_child must NOT mark
1299    /// the task Failed. Instead it leaves status=Running and lets the
1300    /// next poll_task() cycle finalize via the marker.
1301    #[test]
1302    fn reap_child_preserves_running_when_exit_marker_exists() {
1303        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1304        let dir = tempfile::tempdir().unwrap();
1305        let task_id = registry
1306            .spawn(
1307                QUICK_SUCCESS_COMMAND,
1308                "session".to_string(),
1309                dir.path().to_path_buf(),
1310                HashMap::new(),
1311                Some(Duration::from_secs(30)),
1312                dir.path().to_path_buf(),
1313                10,
1314            )
1315            .unwrap();
1316
1317        let task = registry.task_for_session(&task_id, "session").unwrap();
1318
1319        // Wait for child to exit AND for the marker to land. Both happen
1320        // shortly after the wrapper finishes — but we want both observed.
1321        let started = Instant::now();
1322        loop {
1323            let exited = {
1324                let mut state = task.state.lock().unwrap();
1325                if let Some(child) = state.child.as_mut() {
1326                    matches!(child.try_wait(), Ok(Some(_)))
1327                } else {
1328                    true
1329                }
1330            };
1331            if exited && task.paths.exit.exists() {
1332                break;
1333            }
1334            assert!(
1335                started.elapsed() < Duration::from_secs(5),
1336                "child should exit and write marker quickly"
1337            );
1338            std::thread::sleep(Duration::from_millis(20));
1339        }
1340
1341        // reap_child sees: child exited, marker exists. It should:
1342        //  - drop state.child / set state.detached = true
1343        //  - NOT change status (poll_task will finalize via marker next tick)
1344        registry.reap_child(&task);
1345
1346        let state = task.state.lock().unwrap();
1347        assert!(
1348            state.child.is_none(),
1349            "child handle still released even when marker exists"
1350        );
1351        assert!(
1352            state.detached,
1353            "task still marked detached even when marker exists"
1354        );
1355        // Status remains Running because reap_child defers to poll_task
1356        // when a marker exists. It would be wrong for reap to record the
1357        // marker outcome (poll_task does that with proper exit-code
1358        // parsing).
1359        assert_eq!(
1360            state.metadata.status,
1361            BgTaskStatus::Running,
1362            "reap_child must defer to poll_task when marker exists"
1363        );
1364    }
1365
1366    #[test]
1367    fn cleanup_finished_keeps_running_tasks() {
1368        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1369        let dir = tempfile::tempdir().unwrap();
1370        let task_id = registry
1371            .spawn(
1372                LONG_RUNNING_COMMAND,
1373                "session".to_string(),
1374                dir.path().to_path_buf(),
1375                HashMap::new(),
1376                Some(Duration::from_secs(30)),
1377                dir.path().to_path_buf(),
1378                10,
1379            )
1380            .unwrap();
1381
1382        registry.cleanup_finished(Duration::ZERO);
1383
1384        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1385        let _ = registry.kill(&task_id, "session");
1386    }
1387
1388    #[cfg(windows)]
1389    fn wait_for_file(path: &Path) -> String {
1390        let started = Instant::now();
1391        loop {
1392            if path.exists() {
1393                return fs::read_to_string(path).expect("read file");
1394            }
1395            assert!(
1396                started.elapsed() < Duration::from_secs(30),
1397                "timed out waiting for {}",
1398                path.display()
1399            );
1400            std::thread::sleep(Duration::from_millis(100));
1401        }
1402    }
1403
1404    #[cfg(windows)]
1405    fn spawn_windows_registry_command(
1406        command: &str,
1407    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1408        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1409        let dir = tempfile::tempdir().unwrap();
1410        let task_id = registry
1411            .spawn(
1412                command,
1413                "session".to_string(),
1414                dir.path().to_path_buf(),
1415                HashMap::new(),
1416                Some(Duration::from_secs(30)),
1417                dir.path().to_path_buf(),
1418                10,
1419            )
1420            .unwrap();
1421        (registry, dir, task_id)
1422    }
1423
1424    #[cfg(windows)]
1425    #[test]
1426    fn windows_spawn_writes_exit_marker_for_zero_exit() {
1427        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1428        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1429
1430        let content = wait_for_file(&exit_path);
1431
1432        assert_eq!(content.trim(), "0");
1433    }
1434
1435    #[cfg(windows)]
1436    #[test]
1437    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1438        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1439        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1440
1441        let content = wait_for_file(&exit_path);
1442
1443        assert_eq!(content.trim(), "42");
1444    }
1445
1446    #[cfg(windows)]
1447    #[test]
1448    fn windows_spawn_captures_stdout_to_disk() {
1449        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1450        let task = registry.task_for_session(&task_id, "session").unwrap();
1451        let stdout_path = task.paths.stdout.clone();
1452        let exit_path = task.paths.exit.clone();
1453
1454        let _ = wait_for_file(&exit_path);
1455        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1456
1457        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1458    }
1459
1460    #[cfg(windows)]
1461    #[test]
1462    fn windows_spawn_uses_pwsh_when_available() {
1463        let shell = crate::windows_shell::resolve_windows_shell_with(|binary| {
1464            matches!(binary, "pwsh.exe" | "powershell.exe")
1465        });
1466
1467        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1468        assert_eq!(shell.binary(), "pwsh.exe");
1469    }
1470
1471    /// Issue #27 Oracle review P1: cmd wrapper MUST use `!ERRORLEVEL!` (not
1472    /// `%ERRORLEVEL%`) to capture the user command's run-time exit code.
1473    /// `%VAR%` is parse-time-expanded by cmd, so a wrapper using
1474    /// `%ERRORLEVEL%` would record a stale value (typically 0 from cmd's
1475    /// startup) regardless of what the user command returned. `!VAR!`
1476    /// requires delayed expansion, which `WindowsShell::bg_command` enables
1477    /// via `/V:ON`.
1478    #[cfg(windows)]
1479    #[test]
1480    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1481        let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1482        let script =
1483            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1484
1485        // MUST use !ERRORLEVEL! (delayed expansion), NOT %ERRORLEVEL%
1486        // (parse-time expansion). The latter would record a stale exit
1487        // code regardless of what the user command actually returned.
1488        assert!(
1489            script.contains("& echo !ERRORLEVEL! >"),
1490            "wrapper must use delayed expansion: {script}"
1491        );
1492        assert!(
1493            !script.contains("%ERRORLEVEL%"),
1494            "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1495        );
1496        assert!(script.contains("& move /Y"));
1497        // move output should be redirected to nul to avoid polluting the
1498        // user's captured stdout with "1 file(s) moved." lines.
1499        assert!(
1500            script.contains("> nul"),
1501            "wrapper must redirect move output to nul: {script}"
1502        );
1503        assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1504        assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1505    }
1506
1507    /// Issue #27 Oracle review P1: `bg_command()` for Cmd MUST prepend
1508    /// `/V:ON` to enable delayed expansion AND `/S` to use simple-quote
1509    /// parsing (so cmd's /C parser doesn't mangle the wrapper's internal
1510    /// quotes from `cmd_quote`).
1511    #[cfg(windows)]
1512    #[test]
1513    fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1514        use crate::windows_shell::WindowsShell;
1515        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1516        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1517        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1518        assert_eq!(
1519            args_strs,
1520            vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1521            "Cmd::bg_command must prepend /V:ON /D /S /C"
1522        );
1523    }
1524
1525    /// PowerShell variants don't need `/V:ON`-style flags; their args
1526    /// are the same for foreground (`command()`) and background
1527    /// (`bg_command()`).
1528    #[cfg(windows)]
1529    #[test]
1530    fn windows_shell_pwsh_bg_command_uses_standard_args() {
1531        use crate::windows_shell::WindowsShell;
1532        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1533        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1534        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1535        assert!(
1536            args_strs.contains(&"-Command"),
1537            "Pwsh::bg_command must use -Command: {args_strs:?}"
1538        );
1539        assert!(
1540            args_strs.contains(&"Get-Date"),
1541            "Pwsh::bg_command must include the user command body"
1542        );
1543    }
1544
1545    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
1546    /// **cmd.exe-specific** wrapper path captures the user command's
1547    /// run-time exit code correctly. The existing
1548    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
1549    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
1550    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
1551    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
1552    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
1553    ///
1554    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
1555    /// force the cmd-wrapper code path, then writes the exit marker and
1556    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
1557    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
1558    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
1559    /// regardless of what the user command returned.
1560    #[cfg(windows)]
1561    #[test]
1562    fn windows_cmd_wrapper_records_real_exit_code() {
1563        use crate::windows_shell::WindowsShell;
1564        use std::process::Stdio;
1565        use std::time::Instant;
1566
1567        let dir = tempfile::tempdir().unwrap();
1568        let exit_path = dir.path().join("test.exit");
1569        let stdout_path = dir.path().join("test.stdout");
1570        let stderr_path = dir.path().join("test.stderr");
1571
1572        // Pre-create capture files so spawn can attach them as stdio.
1573        create_capture_file(&stdout_path).unwrap();
1574        create_capture_file(&stderr_path).unwrap();
1575        let stdout = create_capture_file(&stdout_path).unwrap();
1576        let stderr = create_capture_file(&stderr_path).unwrap();
1577
1578        let wrapper = WindowsShell::Cmd.wrapper_script("cmd /c exit 42", &exit_path);
1579        let mut cmd = WindowsShell::Cmd.bg_command(&wrapper);
1580        cmd.current_dir(dir.path())
1581            .stdin(Stdio::null())
1582            .stdout(Stdio::from(stdout))
1583            .stderr(Stdio::from(stderr));
1584
1585        let mut child = cmd.spawn().expect("cmd.exe must spawn for this test");
1586        child.wait().expect("child must complete");
1587
1588        // Wait for marker file (atomic rename).
1589        let started = Instant::now();
1590        loop {
1591            if exit_path.exists() {
1592                break;
1593            }
1594            assert!(
1595                started.elapsed() < Duration::from_secs(10),
1596                "exit marker not written within 10s"
1597            );
1598            std::thread::sleep(Duration::from_millis(50));
1599        }
1600
1601        let content = fs::read_to_string(&exit_path).expect("read exit marker");
1602        assert_eq!(
1603            content.trim(),
1604            "42",
1605            "Cmd wrapper must capture user command's real exit code via !ERRORLEVEL!. \
1606             Got {:?} (would be 0 if %ERRORLEVEL% parse-time expansion bug regressed)",
1607            content
1608        );
1609    }
1610}