Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21    create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22    update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30// Note: `resolve_windows_shell` is no longer imported at module scope —
31// production code in `spawn_detached_child` uses `shell_candidates()`
32// with retry instead, and the function remains in `windows_shell.rs`
33// for tests and as a future helper.
34
35/// Default timeout for background bash tasks: 30 minutes.
36/// Agents can override per-call via the `timeout` parameter (in ms).
37const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
41/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
42/// of typical command output (git status, test results, exit messages) — short
43/// enough that round-tripping multiple completions in one reminder stays well
44/// under the model's context budget but long enough that most successful runs
45/// don't need a follow-up `bash_status` call.
46const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50    pub task_id: String,
51    /// Intentionally omitted from serialized completion payloads: push frames
52    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
53    #[serde(skip_serializing)]
54    pub session_id: String,
55    pub status: BgTaskStatus,
56    pub exit_code: Option<i32>,
57    pub command: String,
58    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
59    /// cached so push-frame consumers and `bash_drain_completions` callers see
60    /// the same preview without racing against later output rotation. Empty
61    /// when not captured (e.g., persisted task seen on startup before buffer
62    /// reattachment).
63    #[serde(default, skip_serializing_if = "String::is_empty")]
64    pub output_preview: String,
65    /// True when the captured tail is shorter than the actual output (because
66    /// rotation occurred or the output exceeds the preview cap). Plugins use
67    /// this to render a `…` prefix and signal that `bash_status` would return
68    /// more.
69    #[serde(default, skip_serializing_if = "is_false")]
70    pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74    !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79    #[serde(flatten)]
80    pub info: BgTaskInfo,
81    pub exit_code: Option<i32>,
82    pub child_pid: Option<u32>,
83    pub workdir: String,
84    pub output_preview: String,
85    pub output_truncated: bool,
86    pub output_path: Option<String>,
87    pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92    pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98    pub(crate) progress_sender: SharedProgressSender,
99    watchdog_started: AtomicBool,
100    pub(crate) shutdown: AtomicBool,
101    pub(crate) long_running_reminder_enabled: AtomicBool,
102    pub(crate) long_running_reminder_interval_ms: AtomicU64,
103}
104
105pub(crate) struct BgTask {
106    pub(crate) task_id: String,
107    pub(crate) session_id: String,
108    pub(crate) paths: TaskPaths,
109    pub(crate) started: Instant,
110    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
111    pub(crate) terminal_at: Mutex<Option<Instant>>,
112    pub(crate) state: Mutex<BgTaskState>,
113}
114
115pub(crate) struct BgTaskState {
116    pub(crate) metadata: PersistedTask,
117    pub(crate) child: Option<Child>,
118    pub(crate) detached: bool,
119    pub(crate) buffer: BgBuffer,
120}
121
122impl BgTaskRegistry {
123    pub fn new(progress_sender: SharedProgressSender) -> Self {
124        Self {
125            inner: Arc::new(RegistryInner {
126                tasks: Mutex::new(HashMap::new()),
127                completions: Mutex::new(VecDeque::new()),
128                progress_sender,
129                watchdog_started: AtomicBool::new(false),
130                shutdown: AtomicBool::new(false),
131                long_running_reminder_enabled: AtomicBool::new(true),
132                long_running_reminder_interval_ms: AtomicU64::new(600_000),
133            }),
134        }
135    }
136
137    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
138        self.inner
139            .long_running_reminder_enabled
140            .store(enabled, Ordering::SeqCst);
141        self.inner
142            .long_running_reminder_interval_ms
143            .store(interval_ms, Ordering::SeqCst);
144    }
145
146    #[cfg(unix)]
147    pub fn spawn(
148        &self,
149        command: &str,
150        session_id: String,
151        workdir: PathBuf,
152        env: HashMap<String, String>,
153        timeout: Option<Duration>,
154        storage_dir: PathBuf,
155        max_running: usize,
156        notify_on_completion: bool,
157    ) -> Result<String, String> {
158        self.start_watchdog();
159
160        let running = self.running_count();
161        if running >= max_running {
162            return Err(format!(
163                "background bash task limit exceeded: {running} running (max {max_running})"
164            ));
165        }
166
167        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
168        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
169        let task_id = self.generate_unique_task_id()?;
170        let paths = task_paths(&storage_dir, &session_id, &task_id);
171        fs::create_dir_all(&paths.dir)
172            .map_err(|e| format!("failed to create background task dir: {e}"))?;
173
174        let mut metadata = PersistedTask::starting(
175            task_id.clone(),
176            session_id.clone(),
177            command.to_string(),
178            workdir.clone(),
179            timeout_ms,
180            notify_on_completion,
181        );
182        write_task(&paths.json, &metadata)
183            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
184
185        // Pre-create capture files so the watchdog/buffer can always
186        // open them for reading. The spawn helper opens its own handles
187        // per attempt because each `Command::spawn()` consumes them.
188        create_capture_file(&paths.stdout)
189            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
190        create_capture_file(&paths.stderr)
191            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
192
193        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
194
195        let child_pid = child.id();
196        metadata.mark_running(child_pid, child_pid as i32);
197        write_task(&paths.json, &metadata)
198            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
199
200        let task = Arc::new(BgTask {
201            task_id: task_id.clone(),
202            session_id,
203            paths: paths.clone(),
204            started: Instant::now(),
205            last_reminder_at: Mutex::new(None),
206            terminal_at: Mutex::new(None),
207            state: Mutex::new(BgTaskState {
208                metadata,
209                child: Some(child),
210                detached: false,
211                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
212            }),
213        });
214
215        self.inner
216            .tasks
217            .lock()
218            .map_err(|_| "background task registry lock poisoned".to_string())?
219            .insert(task_id.clone(), task);
220
221        Ok(task_id)
222    }
223
224    #[cfg(windows)]
225    pub fn spawn(
226        &self,
227        command: &str,
228        session_id: String,
229        workdir: PathBuf,
230        env: HashMap<String, String>,
231        timeout: Option<Duration>,
232        storage_dir: PathBuf,
233        max_running: usize,
234        notify_on_completion: bool,
235    ) -> Result<String, String> {
236        self.start_watchdog();
237
238        let running = self.running_count();
239        if running >= max_running {
240            return Err(format!(
241                "background bash task limit exceeded: {running} running (max {max_running})"
242            ));
243        }
244
245        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
246        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
247        let task_id = self.generate_unique_task_id()?;
248        let paths = task_paths(&storage_dir, &session_id, &task_id);
249        fs::create_dir_all(&paths.dir)
250            .map_err(|e| format!("failed to create background task dir: {e}"))?;
251
252        let mut metadata = PersistedTask::starting(
253            task_id.clone(),
254            session_id.clone(),
255            command.to_string(),
256            workdir.clone(),
257            timeout_ms,
258            notify_on_completion,
259        );
260        write_task(&paths.json, &metadata)
261            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
262
263        // Capture files are pre-created so the watchdog/buffer can always
264        // open them for reading even if the child hasn't written anything
265        // yet. The spawn helper opens its own handles per attempt because
266        // each `Command::spawn()` consumes them, and on Windows we may
267        // retry across multiple shell candidates if the first one fails.
268        create_capture_file(&paths.stdout)
269            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
270        create_capture_file(&paths.stderr)
271            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
272
273        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
274
275        let child_pid = child.id();
276        metadata.status = BgTaskStatus::Running;
277        metadata.child_pid = Some(child_pid);
278        metadata.pgid = None;
279        write_task(&paths.json, &metadata)
280            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
281
282        let task = Arc::new(BgTask {
283            task_id: task_id.clone(),
284            session_id,
285            paths: paths.clone(),
286            started: Instant::now(),
287            last_reminder_at: Mutex::new(None),
288            terminal_at: Mutex::new(None),
289            state: Mutex::new(BgTaskState {
290                metadata,
291                child: Some(child),
292                detached: false,
293                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
294            }),
295        });
296
297        self.inner
298            .tasks
299            .lock()
300            .map_err(|_| "background task registry lock poisoned".to_string())?
301            .insert(task_id.clone(), task);
302
303        Ok(task_id)
304    }
305
306    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
307        self.start_watchdog();
308        let dir = session_tasks_dir(storage_dir, session_id);
309        if !dir.exists() {
310            return Ok(());
311        }
312
313        let entries = fs::read_dir(&dir)
314            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
315        for entry in entries.flatten() {
316            let path = entry.path();
317            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
318                continue;
319            }
320            let Ok(mut metadata) = read_task(&path) else {
321                continue;
322            };
323            if metadata.session_id != session_id {
324                continue;
325            }
326
327            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
328            match metadata.status {
329                BgTaskStatus::Starting => {
330                    metadata.mark_terminal(
331                        BgTaskStatus::Failed,
332                        None,
333                        Some("spawn aborted".to_string()),
334                    );
335                    let _ = write_task(&paths.json, &metadata);
336                    self.enqueue_completion_if_needed(&metadata, false);
337                }
338                BgTaskStatus::Running => {
339                    if self.running_metadata_is_stale(&metadata) {
340                        metadata.mark_terminal(
341                            BgTaskStatus::Killed,
342                            None,
343                            Some("orphaned (>24h)".to_string()),
344                        );
345                        if !paths.exit.exists() {
346                            let _ = write_kill_marker_if_absent(&paths.exit);
347                        }
348                        let _ = write_task(&paths.json, &metadata);
349                        self.enqueue_completion_if_needed(&metadata, false);
350                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
351                        metadata = terminal_metadata_from_marker(metadata, marker, None);
352                        let _ = write_task(&paths.json, &metadata);
353                        self.enqueue_completion_if_needed(&metadata, false);
354                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
355                        metadata.mark_terminal(
356                            BgTaskStatus::Failed,
357                            None,
358                            Some("process exited without exit marker".to_string()),
359                        );
360                        let _ = write_task(&paths.json, &metadata);
361                        self.enqueue_completion_if_needed(&metadata, false);
362                    } else {
363                        self.insert_rehydrated_task(metadata, paths, true)?;
364                    }
365                }
366                _ if metadata.status.is_terminal() => {
367                    self.insert_rehydrated_task(metadata.clone(), paths, true)?;
368                    self.enqueue_completion_if_needed(&metadata, false);
369                }
370                _ => {}
371            }
372        }
373
374        Ok(())
375    }
376
377    pub fn status(
378        &self,
379        task_id: &str,
380        session_id: &str,
381        preview_bytes: usize,
382    ) -> Option<BgTaskSnapshot> {
383        let task = self.task_for_session(task_id, session_id)?;
384        let _ = self.poll_task(&task);
385        Some(task.snapshot(preview_bytes))
386    }
387
388    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
389        let tasks = self
390            .inner
391            .tasks
392            .lock()
393            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
394            .unwrap_or_default();
395        tasks
396            .into_iter()
397            .map(|task| {
398                let _ = self.poll_task(&task);
399                task.snapshot(preview_bytes)
400            })
401            .collect()
402    }
403
404    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
405        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
406    }
407
408    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
409        let task = self
410            .task_for_session(task_id, session_id)
411            .ok_or_else(|| format!("background task not found: {task_id}"))?;
412        let mut state = task
413            .state
414            .lock()
415            .map_err(|_| "background task lock poisoned".to_string())?;
416        let updated = update_task(&task.paths.json, |metadata| {
417            metadata.notify_on_completion = true;
418            metadata.completion_delivered = false;
419        })
420        .map_err(|e| format!("failed to promote background task: {e}"))?;
421        state.metadata = updated;
422        if state.metadata.status.is_terminal() {
423            state.buffer.enforce_terminal_cap();
424            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
425        }
426        Ok(true)
427    }
428
429    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
430        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
431            .map(|_| ())
432    }
433
434    pub fn cleanup_finished(&self, older_than: Duration) {
435        let cutoff = Instant::now().checked_sub(older_than);
436        if let Ok(mut tasks) = self.inner.tasks.lock() {
437            tasks.retain(|_, task| {
438                let is_terminal = task
439                    .state
440                    .lock()
441                    .map(|state| state.metadata.status.is_terminal())
442                    .unwrap_or(false);
443                if !is_terminal {
444                    return true;
445                }
446
447                let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
448                match (terminal_at, cutoff) {
449                    (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
450                    (Some(_), None) => false,
451                    (None, _) => true,
452                }
453            });
454        }
455    }
456
457    pub fn drain_completions(&self) -> Vec<BgCompletion> {
458        self.drain_completions_for_session(None)
459    }
460
461    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
462        let mut completions = match self.inner.completions.lock() {
463            Ok(completions) => completions,
464            Err(_) => return Vec::new(),
465        };
466
467        let drained = if let Some(session_id) = session_id {
468            let mut matched = Vec::new();
469            let mut retained = VecDeque::new();
470            while let Some(completion) = completions.pop_front() {
471                if completion.session_id == session_id {
472                    matched.push(completion);
473                } else {
474                    retained.push_back(completion);
475                }
476            }
477            *completions = retained;
478            matched
479        } else {
480            completions.drain(..).collect()
481        };
482        drop(completions);
483
484        for completion in &drained {
485            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
486                let _ = task.set_completion_delivered(true);
487            }
488        }
489
490        drained
491    }
492
493    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
494        self.inner
495            .completions
496            .lock()
497            .map(|completions| {
498                completions
499                    .iter()
500                    .filter(|completion| completion.session_id == session_id)
501                    .cloned()
502                    .collect()
503            })
504            .unwrap_or_default()
505    }
506
507    pub fn detach(&self) {
508        self.inner.shutdown.store(true, Ordering::SeqCst);
509        if let Ok(mut tasks) = self.inner.tasks.lock() {
510            for task in tasks.values() {
511                if let Ok(mut state) = task.state.lock() {
512                    state.child = None;
513                    state.detached = true;
514                }
515            }
516            tasks.clear();
517        }
518    }
519
520    pub fn shutdown(&self) {
521        let tasks = self
522            .inner
523            .tasks
524            .lock()
525            .map(|tasks| {
526                tasks
527                    .values()
528                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
529                    .collect::<Vec<_>>()
530            })
531            .unwrap_or_default();
532        for (task_id, session_id) in tasks {
533            let _ = self.kill(&task_id, &session_id);
534        }
535    }
536
537    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
538        let marker = match read_exit_marker(&task.paths.exit) {
539            Ok(Some(marker)) => marker,
540            Ok(None) => return Ok(()),
541            Err(error) => return Err(format!("failed to read exit marker: {error}")),
542        };
543        self.finalize_from_marker(task, marker, None)
544    }
545
546    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
547        let Ok(mut state) = task.state.lock() else {
548            return;
549        };
550        if let Some(child) = state.child.as_mut() {
551            if matches!(child.try_wait(), Ok(Some(_))) {
552                // Child has exited. If the wrapper successfully wrote an
553                // exit marker, the next `poll_task()` cycle will pick it up
554                // and finalize via `finalize_from_marker`. But if the
555                // wrapper crashed before writing the marker (e.g. SIGKILL,
556                // power loss, wrapper bug), the task would forever appear
557                // Running until `timeout_ms` expired — and if no timeout
558                // was set, until the 24h `running_metadata_is_stale` cutoff
559                // hit at the next aft restart. Same condition as the replay
560                // path's "PID dead but no marker" branch (see line 338).
561                //
562                // To avoid that hidden hang, mark the task Failed
563                // immediately with the same reason string used by replay,
564                // but only if the marker is genuinely absent. If a marker
565                // appeared on disk between try_wait() returning and now
566                // (race window), prefer the marker — let the next poll
567                // cycle finalize from it.
568                state.child = None;
569                state.detached = true;
570                if state.metadata.status.is_terminal() {
571                    return;
572                }
573                if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
574                    return;
575                }
576                let updated = update_task(&task.paths.json, |metadata| {
577                    metadata.mark_terminal(
578                        BgTaskStatus::Failed,
579                        None,
580                        Some("process exited without exit marker".to_string()),
581                    );
582                });
583                if let Ok(metadata) = updated {
584                    state.metadata = metadata;
585                    task.mark_terminal_now();
586                    state.buffer.enforce_terminal_cap();
587                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
588                }
589            }
590        }
591    }
592
593    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
594        self.inner
595            .tasks
596            .lock()
597            .map(|tasks| {
598                tasks
599                    .values()
600                    .filter(|task| task.is_running())
601                    .cloned()
602                    .collect()
603            })
604            .unwrap_or_default()
605    }
606
607    fn insert_rehydrated_task(
608        &self,
609        metadata: PersistedTask,
610        paths: TaskPaths,
611        detached: bool,
612    ) -> Result<(), String> {
613        let task_id = metadata.task_id.clone();
614        let session_id = metadata.session_id.clone();
615        let task = Arc::new(BgTask {
616            task_id: task_id.clone(),
617            session_id,
618            paths: paths.clone(),
619            started: Instant::now(),
620            last_reminder_at: Mutex::new(None),
621            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
622            state: Mutex::new(BgTaskState {
623                metadata,
624                child: None,
625                detached,
626                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
627            }),
628        });
629        self.inner
630            .tasks
631            .lock()
632            .map_err(|_| "background task registry lock poisoned".to_string())?
633            .insert(task_id, task);
634        Ok(())
635    }
636
637    fn kill_with_status(
638        &self,
639        task_id: &str,
640        session_id: &str,
641        terminal_status: BgTaskStatus,
642    ) -> Result<BgTaskSnapshot, String> {
643        let task = self
644            .task_for_session(task_id, session_id)
645            .ok_or_else(|| format!("background task not found: {task_id}"))?;
646
647        {
648            let mut state = task
649                .state
650                .lock()
651                .map_err(|_| "background task lock poisoned".to_string())?;
652            if state.metadata.status.is_terminal() {
653                return Ok(task.snapshot_locked(&state, 5 * 1024));
654            }
655
656            state.metadata.status = BgTaskStatus::Killing;
657            write_task(&task.paths.json, &state.metadata)
658                .map_err(|e| format!("failed to persist killing state: {e}"))?;
659
660            #[cfg(unix)]
661            if let Some(pgid) = state.metadata.pgid {
662                terminate_pgid(pgid, state.child.as_mut());
663            }
664            #[cfg(windows)]
665            if let Some(child) = state.child.as_mut() {
666                super::process::terminate_process(child);
667            } else if let Some(pid) = state.metadata.child_pid {
668                terminate_pid(pid);
669            }
670            if let Some(child) = state.child.as_mut() {
671                let _ = child.wait();
672            }
673            state.child = None;
674            state.detached = true;
675
676            if !task.paths.exit.exists() {
677                write_kill_marker_if_absent(&task.paths.exit)
678                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
679            }
680
681            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
682                Some(124)
683            } else {
684                None
685            };
686            state
687                .metadata
688                .mark_terminal(terminal_status, exit_code, None);
689            task.mark_terminal_now();
690            write_task(&task.paths.json, &state.metadata)
691                .map_err(|e| format!("failed to persist killed state: {e}"))?;
692            state.buffer.enforce_terminal_cap();
693            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
694        }
695
696        Ok(task.snapshot(5 * 1024))
697    }
698
699    fn finalize_from_marker(
700        &self,
701        task: &Arc<BgTask>,
702        marker: ExitMarker,
703        reason: Option<String>,
704    ) -> Result<(), String> {
705        let mut state = task
706            .state
707            .lock()
708            .map_err(|_| "background task lock poisoned".to_string())?;
709        if state.metadata.status.is_terminal() {
710            return Ok(());
711        }
712
713        let updated = update_task(&task.paths.json, |metadata| {
714            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
715            *metadata = new_metadata;
716        })
717        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
718        state.metadata = updated;
719        task.mark_terminal_now();
720        state.child = None;
721        state.detached = true;
722        state.buffer.enforce_terminal_cap();
723        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
724        Ok(())
725    }
726
727    fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
728        if metadata.status.is_terminal() && !metadata.completion_delivered {
729            self.enqueue_completion_locked(metadata, None, emit_frame);
730        }
731    }
732
733    fn enqueue_completion_locked(
734        &self,
735        metadata: &PersistedTask,
736        buffer: Option<&BgBuffer>,
737        emit_frame: bool,
738    ) {
739        if !metadata.status.is_terminal() || metadata.completion_delivered {
740            return;
741        }
742        // Read tail once at completion time and cache on the BgCompletion so
743        // both the push-frame consumer (running session) and any later
744        // `bash_drain_completions` poll (different session, restart) see the
745        // same preview without racing against rotation.
746        let (output_preview, output_truncated) = match buffer {
747            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
748            None => (String::new(), false),
749        };
750        let completion = BgCompletion {
751            task_id: metadata.task_id.clone(),
752            session_id: metadata.session_id.clone(),
753            status: metadata.status.clone(),
754            exit_code: metadata.exit_code,
755            command: metadata.command.clone(),
756            output_preview,
757            output_truncated,
758        };
759        if let Ok(mut completions) = self.inner.completions.lock() {
760            if completions
761                .iter()
762                .any(|completion| completion.task_id == metadata.task_id)
763            {
764                return;
765            }
766            completions.push_back(completion.clone());
767        } else {
768            return;
769        }
770
771        if emit_frame {
772            self.emit_bash_completed(completion);
773        }
774    }
775
776    fn emit_bash_completed(&self, completion: BgCompletion) {
777        let Ok(progress_sender) = self
778            .inner
779            .progress_sender
780            .lock()
781            .map(|sender| sender.clone())
782        else {
783            return;
784        };
785        let Some(sender) = progress_sender.as_ref() else {
786            return;
787        };
788        // Clone the callback out of the registry mutex before writing to stdout;
789        // otherwise a blocked push-frame write could pin the mutex and starve
790        // unrelated progress-sender updates.
791        // Bg task transitions are discovered by the watchdog thread, so the
792        // sender is shared behind a Mutex. It still uses the same stdout writer
793        // closure as foreground progress frames, preserving the existing lock/
794        // flush behavior in main.rs.
795        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
796            completion.task_id,
797            completion.session_id,
798            completion.status,
799            completion.exit_code,
800            completion.command,
801            completion.output_preview,
802            completion.output_truncated,
803        )));
804    }
805
806    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
807        if !self
808            .inner
809            .long_running_reminder_enabled
810            .load(Ordering::SeqCst)
811        {
812            return;
813        }
814        let interval_ms = self
815            .inner
816            .long_running_reminder_interval_ms
817            .load(Ordering::SeqCst);
818        if interval_ms == 0 {
819            return;
820        }
821        let interval = Duration::from_millis(interval_ms);
822        let now = Instant::now();
823        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
824            return;
825        };
826        let since = last_reminder_at.unwrap_or(task.started);
827        if now.duration_since(since) < interval {
828            return;
829        }
830        let command = task
831            .state
832            .lock()
833            .map(|state| state.metadata.command.clone())
834            .unwrap_or_default();
835        *last_reminder_at = Some(now);
836        self.emit_bash_long_running(BashLongRunningFrame::new(
837            task.task_id.clone(),
838            task.session_id.clone(),
839            command,
840            task.started.elapsed().as_millis() as u64,
841        ));
842    }
843
844    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
845        let Ok(progress_sender) = self
846            .inner
847            .progress_sender
848            .lock()
849            .map(|sender| sender.clone())
850        else {
851            return;
852        };
853        if let Some(sender) = progress_sender.as_ref() {
854            sender(PushFrame::BashLongRunning(frame));
855        }
856    }
857
858    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
859        self.inner
860            .tasks
861            .lock()
862            .ok()
863            .and_then(|tasks| tasks.get(task_id).cloned())
864    }
865
866    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
867        self.task(task_id)
868            .filter(|task| task.session_id == session_id)
869    }
870
871    fn running_count(&self) -> usize {
872        self.inner
873            .tasks
874            .lock()
875            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
876            .unwrap_or(0)
877    }
878
879    fn start_watchdog(&self) {
880        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
881            super::watchdog::start(self.clone());
882        }
883    }
884
885    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
886        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
887    }
888
889    #[cfg(test)]
890    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
891        self.task_for_session(task_id, session_id)
892            .map(|task| task.paths.json.clone())
893    }
894
895    #[cfg(test)]
896    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
897        self.task_for_session(task_id, session_id)
898            .map(|task| task.paths.exit.clone())
899    }
900
901    /// Generate a `bgb-{16hex}` slug that is unique against live tasks and queued completions.
902    fn generate_unique_task_id(&self) -> Result<String, String> {
903        for _ in 0..32 {
904            let candidate = random_slug();
905            let tasks = self
906                .inner
907                .tasks
908                .lock()
909                .map_err(|_| "background task registry lock poisoned".to_string())?;
910            if tasks.contains_key(&candidate) {
911                continue;
912            }
913            let completions = self
914                .inner
915                .completions
916                .lock()
917                .map_err(|_| "background completions lock poisoned".to_string())?;
918            if completions
919                .iter()
920                .any(|completion| completion.task_id == candidate)
921            {
922                continue;
923            }
924            return Ok(candidate);
925        }
926        Err("failed to allocate unique background task id after 32 attempts".to_string())
927    }
928}
929
930impl Default for BgTaskRegistry {
931    fn default() -> Self {
932        Self::new(Arc::new(Mutex::new(None)))
933    }
934}
935
936impl BgTask {
937    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
938        let state = self
939            .state
940            .lock()
941            .unwrap_or_else(|poison| poison.into_inner());
942        self.snapshot_locked(&state, preview_bytes)
943    }
944
945    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
946        let metadata = &state.metadata;
947        let duration_ms = metadata.duration_ms.or_else(|| {
948            metadata
949                .status
950                .is_terminal()
951                .then(|| self.started.elapsed().as_millis() as u64)
952        });
953        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
954        BgTaskSnapshot {
955            info: BgTaskInfo {
956                task_id: self.task_id.clone(),
957                status: metadata.status.clone(),
958                command: metadata.command.clone(),
959                started_at: metadata.started_at,
960                duration_ms,
961            },
962            exit_code: metadata.exit_code,
963            child_pid: metadata.child_pid,
964            workdir: metadata.workdir.display().to_string(),
965            output_preview,
966            output_truncated,
967            output_path: state
968                .buffer
969                .output_path()
970                .map(|path| path.display().to_string()),
971            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
972        }
973    }
974
975    pub(crate) fn is_running(&self) -> bool {
976        self.state
977            .lock()
978            .map(|state| state.metadata.status == BgTaskStatus::Running)
979            .unwrap_or(false)
980    }
981
982    fn mark_terminal_now(&self) {
983        if let Ok(mut terminal_at) = self.terminal_at.lock() {
984            if terminal_at.is_none() {
985                *terminal_at = Some(Instant::now());
986            }
987        }
988    }
989
990    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
991        let mut state = self
992            .state
993            .lock()
994            .map_err(|_| "background task lock poisoned".to_string())?;
995        let updated = update_task(&self.paths.json, |metadata| {
996            metadata.completion_delivered = delivered;
997        })
998        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
999        state.metadata = updated;
1000        Ok(())
1001    }
1002}
1003
1004fn terminal_metadata_from_marker(
1005    mut metadata: PersistedTask,
1006    marker: ExitMarker,
1007    reason: Option<String>,
1008) -> PersistedTask {
1009    match marker {
1010        ExitMarker::Code(code) => {
1011            let status = if code == 0 {
1012                BgTaskStatus::Completed
1013            } else {
1014                BgTaskStatus::Failed
1015            };
1016            metadata.mark_terminal(status, Some(code), reason);
1017        }
1018        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1019    }
1020    metadata
1021}
1022
1023#[cfg(unix)]
1024fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1025    let mut cmd = Command::new("/bin/sh");
1026    cmd.arg("-c")
1027        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1028        .arg("/bin/sh")
1029        .arg(command)
1030        .arg(exit_path);
1031    unsafe {
1032        cmd.pre_exec(|| {
1033            if libc::setsid() == -1 {
1034                return Err(std::io::Error::last_os_error());
1035            }
1036            Ok(())
1037        });
1038    }
1039    cmd
1040}
1041
1042#[cfg(windows)]
1043fn detached_shell_command_for(
1044    shell: crate::windows_shell::WindowsShell,
1045    command: &str,
1046    exit_path: &Path,
1047    paths: &TaskPaths,
1048    creation_flags: u32,
1049) -> Result<Command, String> {
1050    use crate::windows_shell::WindowsShell;
1051    // Write the wrapper to a temp file alongside the other task files,
1052    // then invoke the shell with the file path as a single clean
1053    // argument. This sidesteps the entire Windows command-line quoting
1054    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
1055    // parser all interacting with embedded quotes in the wrapper).
1056    //
1057    // Path arguments don't need quoting in the same problematic way
1058    // because: (1) we use no-space task IDs (bgb-XXXXXXXX) so the path
1059    // contains no characters that need shell escaping; (2) the wrapper
1060    // body's internal quotes never reach the shell command line — the
1061    // shell reads them from disk by file syntax rules, not command-line
1062    // parser rules.
1063    let wrapper_body = shell.wrapper_script(command, exit_path);
1064    let wrapper_ext = match shell {
1065        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1066        WindowsShell::Cmd => "bat",
1067        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
1068        // so the file extension is purely cosmetic; `.sh` matches what an
1069        // operator would expect when grepping the spill directory.
1070        WindowsShell::Posix(_) => "sh",
1071    };
1072    let wrapper_path = paths.dir.join(format!(
1073        "{}.{}",
1074        paths
1075            .json
1076            .file_stem()
1077            .and_then(|s| s.to_str())
1078            .unwrap_or("wrapper"),
1079        wrapper_ext
1080    ));
1081    fs::write(&wrapper_path, wrapper_body)
1082        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1083
1084    let mut cmd = Command::new(shell.binary().as_ref());
1085    match shell {
1086        WindowsShell::Pwsh | WindowsShell::Powershell => {
1087            // -File runs the script with no quoting issues. `-NoLogo`,
1088            // `-NoProfile`, etc. apply to the host before the file runs.
1089            cmd.args([
1090                "-NoLogo",
1091                "-NoProfile",
1092                "-NonInteractive",
1093                "-ExecutionPolicy",
1094                "Bypass",
1095                "-File",
1096            ]);
1097            cmd.arg(&wrapper_path);
1098        }
1099        WindowsShell::Cmd => {
1100            // `cmd /V:ON /D /C "<bat-file-path>"` — invoking a .bat
1101            // file via /C is well-defined; the file's contents are
1102            // read line-by-line by cmd's batch processor, NOT
1103            // re-interpreted by the /C parser. This avoids the
1104            // "filename syntax incorrect" errors that came from
1105            // having complex compound commands on the cmd line.
1106            cmd.args(["/V:ON", "/D", "/C"]);
1107            cmd.arg(&wrapper_path);
1108        }
1109        WindowsShell::Posix(_) => {
1110            // git-bash and other POSIX shells run the wrapper script with
1111            // `<binary> <wrapper-path>` (the wrapper is just a shell
1112            // script). No special flags needed — the `trap` and atomic
1113            // exit-marker rename in `wrapper_script` are POSIX-standard.
1114            cmd.arg(&wrapper_path);
1115        }
1116    }
1117
1118    // Win32 process creation flags. Caller selects whether to include
1119    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
1120    // for the breakaway-fallback strategy.
1121    cmd.creation_flags(creation_flags);
1122    Ok(cmd)
1123}
1124
1125/// Spawn a detached background bash child process.
1126///
1127/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1128/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1129/// cmd.exe) and retries with the next candidate when the previous one
1130/// fails to spawn with `NotFound` — the same runtime safety net the
1131/// foreground bash path has, so issue #27 callers landing on cmd.exe
1132/// fallback can also use background bash. The wrapper script is
1133/// regenerated per attempt because PowerShell wrappers embed the shell
1134/// binary by name; the stdout/stderr capture handles are also reopened
1135/// per attempt because `Command::spawn()` consumes them.
1136///
1137/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1138/// return immediately without retry — they indicate a problem with the
1139/// resolved shell that retrying with a different shell won't fix.
1140fn spawn_detached_child(
1141    command: &str,
1142    paths: &TaskPaths,
1143    workdir: &Path,
1144    env: &HashMap<String, String>,
1145) -> Result<std::process::Child, String> {
1146    #[cfg(not(windows))]
1147    {
1148        let stdout = create_capture_file(&paths.stdout)
1149            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1150        let stderr = create_capture_file(&paths.stderr)
1151            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1152        detached_shell_command(command, &paths.exit)
1153            .current_dir(workdir)
1154            .envs(env)
1155            .stdin(Stdio::null())
1156            .stdout(Stdio::from(stdout))
1157            .stderr(Stdio::from(stderr))
1158            .spawn()
1159            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1160    }
1161    #[cfg(windows)]
1162    {
1163        use crate::windows_shell::shell_candidates;
1164        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
1165        // legacy foreground bash spawn path. v0.20 routes ALL bash through
1166        // this background spawn helper, including foreground tool calls
1167        // where the model writes PowerShell-syntax (`$var = ...`,
1168        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
1169        // The earlier v0.18-era cmd-first override worked around a
1170        // PowerShell detached-output bug; that bug is fixed at the
1171        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
1172        // see flag block below), so we no longer need to misroute PS
1173        // commands through cmd.
1174        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1175        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
1176        // first (so the bg child outlives the AFT process when AFT is killed),
1177        // then fall back without it for environments where the parent is in a
1178        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
1179        // runners (GitHub Actions windows-2022) and some MDM-managed corp
1180        // environments hit this — `CreateProcess` returns Access Denied (5).
1181        // Without breakaway, the child still runs detached but will be torn
1182        // down with the parent if the parent process group is signaled.
1183        //
1184        // We use CREATE_NO_WINDOW (no visible console window, but the
1185        // child still has a hidden console) rather than DETACHED_PROCESS
1186        // (no console at all). PowerShell-based wrappers that perform
1187        // file I/O via [System.IO.File] need a console handle to flush
1188        // stdout/stderr correctly even when redirected — under
1189        // DETACHED_PROCESS, pwsh sometimes silently exits before
1190        // executing later script statements (the Move-Item that writes
1191        // the exit marker never runs), leaving the bg task forever
1192        // marked Failed: process exited without exit marker. cmd.exe
1193        // wrappers tolerate DETACHED_PROCESS, but switching to
1194        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
1195        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1196        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1197        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1198        let with_breakaway =
1199            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1200        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1201        let mut last_error: Option<String> = None;
1202        for (idx, shell) in candidates.iter().enumerate() {
1203            // Per-shell, try with breakaway first. If the process is in a
1204            // restrictive job, the breakaway flag triggers Access Denied
1205            // (os error 5). Retry once without breakaway.
1206            for &flags in &[with_breakaway, without_breakaway] {
1207                // Re-open capture handles per attempt; spawn() consumes them.
1208                let stdout = create_capture_file(&paths.stdout)
1209                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1210                let stderr = create_capture_file(&paths.stderr)
1211                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1212                let mut cmd =
1213                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1214                cmd.current_dir(workdir)
1215                    .envs(env)
1216                    .stdin(Stdio::null())
1217                    .stdout(Stdio::from(stdout))
1218                    .stderr(Stdio::from(stderr));
1219                match cmd.spawn() {
1220                    Ok(child) => {
1221                        if idx > 0 {
1222                            log::warn!(
1223                                "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1224                                 the cached PATH probe disagreed with runtime spawn — likely PATH \
1225                                 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1226                                shell.binary(),
1227                                idx
1228                            );
1229                        }
1230                        if flags == without_breakaway {
1231                            log::warn!(
1232                                "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1233                                 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1234                                 Spawned without breakaway; the bg task will be torn down if the \
1235                                 AFT process group is killed."
1236                            );
1237                        }
1238                        return Ok(child);
1239                    }
1240                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1241                        log::warn!(
1242                            "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1243                            shell.binary()
1244                        );
1245                        last_error = Some(format!("{}: {e}", shell.binary()));
1246                        // Skip the without-breakaway retry for NotFound — the
1247                        // binary itself is missing, breakaway flag is irrelevant.
1248                        break;
1249                    }
1250                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1251                        // Access Denied during breakaway — retry without it.
1252                        log::warn!(
1253                            "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1254                             Access Denied — retrying {} without breakaway",
1255                            shell.binary()
1256                        );
1257                        last_error = Some(format!("{}: {e}", shell.binary()));
1258                        continue;
1259                    }
1260                    Err(e) => {
1261                        return Err(format!(
1262                            "failed to spawn background bash command via {}: {e}",
1263                            shell.binary()
1264                        ));
1265                    }
1266                }
1267            }
1268        }
1269        Err(format!(
1270            "failed to spawn background bash command: no Windows shell could be spawned. \
1271             Last error: {}. PATH-probed candidates: {:?}",
1272            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1273            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1274        ))
1275    }
1276}
1277
1278fn random_slug() -> String {
1279    let mut bytes = [0u8; 4];
1280    // getrandom is a transitive dependency; use it directly for OS entropy.
1281    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1282        // Extremely unlikely fallback: time + pid mix.
1283        let t = SystemTime::now()
1284            .duration_since(UNIX_EPOCH)
1285            .map(|d| d.subsec_nanos())
1286            .unwrap_or(0);
1287        let p = std::process::id();
1288        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1289    });
1290    // `bgb-` + 8 lowercase hex chars — compact, OS-entropy backed.
1291    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1292    format!("bgb-{hex}")
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use std::collections::HashMap;
1298    #[cfg(windows)]
1299    use std::fs;
1300    use std::sync::{Arc, Mutex};
1301    use std::time::Duration;
1302    #[cfg(windows)]
1303    use std::time::Instant;
1304
1305    use super::*;
1306
1307    #[cfg(unix)]
1308    const QUICK_SUCCESS_COMMAND: &str = "true";
1309    #[cfg(windows)]
1310    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1311
1312    #[cfg(unix)]
1313    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1314    #[cfg(windows)]
1315    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1316
1317    #[test]
1318    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1319        let registry = BgTaskRegistry::default();
1320        let dir = tempfile::tempdir().unwrap();
1321        let task_id = registry
1322            .spawn(
1323                QUICK_SUCCESS_COMMAND,
1324                "session".to_string(),
1325                dir.path().to_path_buf(),
1326                HashMap::new(),
1327                Some(Duration::from_secs(30)),
1328                dir.path().to_path_buf(),
1329                10,
1330                true,
1331            )
1332            .unwrap();
1333        registry
1334            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1335            .unwrap();
1336
1337        registry.cleanup_finished(Duration::ZERO);
1338
1339        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1340    }
1341
1342    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1343    /// watchdog path (reap_child) marks a task Failed when the child
1344    /// has exited but no exit marker was written. Before this fix the
1345    /// task would remain `Running` until timeout, even though the
1346    /// process was definitely dead.
1347    ///
1348    /// Cross-platform: uses a quick-exiting command that does NOT go
1349    /// through the wrapper script (we manually clear the exit marker
1350    /// after spawn to simulate the wrapper crashing before write).
1351    #[test]
1352    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1353        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1354        let dir = tempfile::tempdir().unwrap();
1355        let task_id = registry
1356            .spawn(
1357                QUICK_SUCCESS_COMMAND,
1358                "session".to_string(),
1359                dir.path().to_path_buf(),
1360                HashMap::new(),
1361                Some(Duration::from_secs(30)),
1362                dir.path().to_path_buf(),
1363                10,
1364                true,
1365            )
1366            .unwrap();
1367
1368        let task = registry.task_for_session(&task_id, "session").unwrap();
1369
1370        // Wait for the child to actually exit and the wrapper to either
1371        // write the marker or fail. Then nuke the marker to simulate
1372        // wrapper crash before write. Poll up to 5s; this is plenty for a
1373        // `true`/`cmd /c exit 0` invocation.
1374        let started = Instant::now();
1375        loop {
1376            let exited = {
1377                let mut state = task.state.lock().unwrap();
1378                if let Some(child) = state.child.as_mut() {
1379                    matches!(child.try_wait(), Ok(Some(_)))
1380                } else {
1381                    true
1382                }
1383            };
1384            if exited {
1385                break;
1386            }
1387            assert!(
1388                started.elapsed() < Duration::from_secs(5),
1389                "child should exit quickly"
1390            );
1391            std::thread::sleep(Duration::from_millis(20));
1392        }
1393
1394        // Wrapper likely wrote the marker by now; remove it to simulate
1395        // a wrapper crash that exited before persisting the exit code.
1396        let _ = std::fs::remove_file(&task.paths.exit);
1397
1398        // Sanity: task is still Running per metadata (replay/poll hasn't
1399        // observed the missing marker yet).
1400        assert!(
1401            task.is_running(),
1402            "precondition: metadata.status == Running"
1403        );
1404        assert!(
1405            !task.paths.exit.exists(),
1406            "precondition: exit marker absent"
1407        );
1408
1409        // Invoke the watchdog's reap_child directly. The fix should mark
1410        // the task Failed with the documented reason string, instead of
1411        // just dropping the child handle and leaving status=Running.
1412        registry.reap_child(&task);
1413
1414        let state = task.state.lock().unwrap();
1415        assert!(
1416            state.metadata.status.is_terminal(),
1417            "reap_child must transition to terminal when PID dead and no marker. \
1418             Got status={:?}",
1419            state.metadata.status
1420        );
1421        assert_eq!(
1422            state.metadata.status,
1423            BgTaskStatus::Failed,
1424            "must specifically be Failed (not Killed): status={:?}",
1425            state.metadata.status
1426        );
1427        assert_eq!(
1428            state.metadata.status_reason.as_deref(),
1429            Some("process exited without exit marker"),
1430            "reason must match replay path's wording: {:?}",
1431            state.metadata.status_reason
1432        );
1433        assert!(
1434            state.child.is_none(),
1435            "child handle must be released after reap"
1436        );
1437        assert!(state.detached, "task must be marked detached after reap");
1438    }
1439
1440    /// Companion to the above: when the exit marker DOES exist on disk
1441    /// at reap_child time (race window — wrapper finished writing
1442    /// between try_wait and the marker check), reap_child must NOT mark
1443    /// the task Failed. Instead it leaves status=Running and lets the
1444    /// next poll_task() cycle finalize via the marker.
1445    #[test]
1446    fn reap_child_preserves_running_when_exit_marker_exists() {
1447        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1448        let dir = tempfile::tempdir().unwrap();
1449        let task_id = registry
1450            .spawn(
1451                QUICK_SUCCESS_COMMAND,
1452                "session".to_string(),
1453                dir.path().to_path_buf(),
1454                HashMap::new(),
1455                Some(Duration::from_secs(30)),
1456                dir.path().to_path_buf(),
1457                10,
1458                true,
1459            )
1460            .unwrap();
1461
1462        let task = registry.task_for_session(&task_id, "session").unwrap();
1463
1464        // Wait for child to exit AND for the marker to land. Both happen
1465        // shortly after the wrapper finishes — but we want both observed.
1466        let started = Instant::now();
1467        loop {
1468            let exited = {
1469                let mut state = task.state.lock().unwrap();
1470                if let Some(child) = state.child.as_mut() {
1471                    matches!(child.try_wait(), Ok(Some(_)))
1472                } else {
1473                    true
1474                }
1475            };
1476            if exited && task.paths.exit.exists() {
1477                break;
1478            }
1479            assert!(
1480                started.elapsed() < Duration::from_secs(5),
1481                "child should exit and write marker quickly"
1482            );
1483            std::thread::sleep(Duration::from_millis(20));
1484        }
1485
1486        // reap_child sees: child exited, marker exists. It should:
1487        //  - drop state.child / set state.detached = true
1488        //  - NOT change status (poll_task will finalize via marker next tick)
1489        registry.reap_child(&task);
1490
1491        let state = task.state.lock().unwrap();
1492        assert!(
1493            state.child.is_none(),
1494            "child handle still released even when marker exists"
1495        );
1496        assert!(
1497            state.detached,
1498            "task still marked detached even when marker exists"
1499        );
1500        // Status remains Running because reap_child defers to poll_task
1501        // when a marker exists. It would be wrong for reap to record the
1502        // marker outcome (poll_task does that with proper exit-code
1503        // parsing).
1504        assert_eq!(
1505            state.metadata.status,
1506            BgTaskStatus::Running,
1507            "reap_child must defer to poll_task when marker exists"
1508        );
1509    }
1510
1511    #[test]
1512    fn cleanup_finished_keeps_running_tasks() {
1513        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1514        let dir = tempfile::tempdir().unwrap();
1515        let task_id = registry
1516            .spawn(
1517                LONG_RUNNING_COMMAND,
1518                "session".to_string(),
1519                dir.path().to_path_buf(),
1520                HashMap::new(),
1521                Some(Duration::from_secs(30)),
1522                dir.path().to_path_buf(),
1523                10,
1524                true,
1525            )
1526            .unwrap();
1527
1528        registry.cleanup_finished(Duration::ZERO);
1529
1530        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1531        let _ = registry.kill(&task_id, "session");
1532    }
1533
1534    #[cfg(windows)]
1535    fn wait_for_file(path: &Path) -> String {
1536        let started = Instant::now();
1537        loop {
1538            if path.exists() {
1539                return fs::read_to_string(path).expect("read file");
1540            }
1541            assert!(
1542                started.elapsed() < Duration::from_secs(30),
1543                "timed out waiting for {}",
1544                path.display()
1545            );
1546            std::thread::sleep(Duration::from_millis(100));
1547        }
1548    }
1549
1550    #[cfg(windows)]
1551    fn spawn_windows_registry_command(
1552        command: &str,
1553    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1554        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1555        let dir = tempfile::tempdir().unwrap();
1556        let task_id = registry
1557            .spawn(
1558                command,
1559                "session".to_string(),
1560                dir.path().to_path_buf(),
1561                HashMap::new(),
1562                Some(Duration::from_secs(30)),
1563                dir.path().to_path_buf(),
1564                10,
1565            )
1566            .unwrap();
1567        (registry, dir, task_id)
1568    }
1569
1570    #[cfg(windows)]
1571    #[test]
1572    fn windows_spawn_writes_exit_marker_for_zero_exit() {
1573        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1574        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1575
1576        let content = wait_for_file(&exit_path);
1577
1578        assert_eq!(content.trim(), "0");
1579    }
1580
1581    #[cfg(windows)]
1582    #[test]
1583    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1584        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1585        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1586
1587        let content = wait_for_file(&exit_path);
1588
1589        assert_eq!(content.trim(), "42");
1590    }
1591
1592    #[cfg(windows)]
1593    #[test]
1594    fn windows_spawn_captures_stdout_to_disk() {
1595        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1596        let task = registry.task_for_session(&task_id, "session").unwrap();
1597        let stdout_path = task.paths.stdout.clone();
1598        let exit_path = task.paths.exit.clone();
1599
1600        let _ = wait_for_file(&exit_path);
1601        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1602
1603        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1604    }
1605
1606    #[cfg(windows)]
1607    #[test]
1608    fn windows_spawn_uses_pwsh_when_available() {
1609        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
1610        // (We intentionally pass None for shell_env to keep this test
1611        // independent of the runner's actual env.)
1612        let candidates = crate::windows_shell::shell_candidates_with(
1613            |binary| match binary {
1614                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
1615                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
1616                _ => None,
1617            },
1618            || None,
1619        );
1620        let shell = candidates.first().expect("at least one candidate").clone();
1621        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1622        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
1623    }
1624
1625    /// Issue #27 Oracle review P1: cmd wrapper MUST use `!ERRORLEVEL!` (not
1626    /// `%ERRORLEVEL%`) to capture the user command's run-time exit code.
1627    /// `%VAR%` is parse-time-expanded by cmd, so a wrapper using
1628    /// `%ERRORLEVEL%` would record a stale value (typically 0 from cmd's
1629    /// startup) regardless of what the user command returned. `!VAR!`
1630    /// requires delayed expansion, which `WindowsShell::bg_command` enables
1631    /// via `/V:ON`.
1632    #[cfg(windows)]
1633    #[test]
1634    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1635        let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1636        let script =
1637            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1638
1639        // MUST use !ERRORLEVEL! (delayed expansion), NOT %ERRORLEVEL%
1640        // (parse-time expansion). The latter would record a stale exit
1641        // code regardless of what the user command actually returned.
1642        assert!(
1643            script.contains("& echo !ERRORLEVEL! >"),
1644            "wrapper must use delayed expansion: {script}"
1645        );
1646        assert!(
1647            !script.contains("%ERRORLEVEL%"),
1648            "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1649        );
1650        assert!(script.contains("& move /Y"));
1651        // move output should be redirected to nul to avoid polluting the
1652        // user's captured stdout with "1 file(s) moved." lines.
1653        assert!(
1654            script.contains("> nul"),
1655            "wrapper must redirect move output to nul: {script}"
1656        );
1657        assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1658        assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1659    }
1660
1661    /// Issue #27 Oracle review P1: `bg_command()` for Cmd MUST prepend
1662    /// `/V:ON` to enable delayed expansion AND `/S` to use simple-quote
1663    /// parsing (so cmd's /C parser doesn't mangle the wrapper's internal
1664    /// quotes from `cmd_quote`).
1665    #[cfg(windows)]
1666    #[test]
1667    fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1668        use crate::windows_shell::WindowsShell;
1669        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1670        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1671        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1672        assert_eq!(
1673            args_strs,
1674            vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1675            "Cmd::bg_command must prepend /V:ON /D /S /C"
1676        );
1677    }
1678
1679    /// PowerShell variants don't need `/V:ON`-style flags; their args
1680    /// are the same for foreground (`command()`) and background
1681    /// (`bg_command()`).
1682    #[cfg(windows)]
1683    #[test]
1684    fn windows_shell_pwsh_bg_command_uses_standard_args() {
1685        use crate::windows_shell::WindowsShell;
1686        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1687        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1688        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1689        assert!(
1690            args_strs.contains(&"-Command"),
1691            "Pwsh::bg_command must use -Command: {args_strs:?}"
1692        );
1693        assert!(
1694            args_strs.contains(&"Get-Date"),
1695            "Pwsh::bg_command must include the user command body"
1696        );
1697    }
1698
1699    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
1700    /// **cmd.exe-specific** wrapper path captures the user command's
1701    /// run-time exit code correctly. The existing
1702    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
1703    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
1704    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
1705    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
1706    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
1707    ///
1708    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
1709    /// force the cmd-wrapper code path, then writes the exit marker and
1710    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
1711    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
1712    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
1713    /// regardless of what the user command returned.
1714    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
1715    /// the inline command-line wrapper helper that production code does
1716    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
1717    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
1718    /// produced silent failures on Windows 11 (move /Y could not parse
1719    /// path arguments through cmd's /C parser). The `bg_command` helper
1720    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
1721    /// the production spawn path goes through `detached_shell_command_for`
1722    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
1723    /// <bat-path>`.
1724    ///
1725    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
1726    /// covered live by the Windows e2e harness scenario 2d
1727    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
1728    /// exercises the real file-based wrapper end-to-end via the protocol.
1729    #[allow(dead_code)]
1730    #[cfg(any())] // disabled on all targets
1731    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1732}