Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21    create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22    update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30// Note: `resolve_windows_shell` is no longer imported at module scope —
31// production code in `spawn_detached_child` uses `shell_candidates()`
32// with retry instead, and the function remains in `windows_shell.rs`
33// for tests and as a future helper.
34
35/// Default timeout for background bash tasks: 30 minutes.
36/// Agents can override per-call via the `timeout` parameter (in ms).
37const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
41/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
42/// of typical command output (git status, test results, exit messages) — short
43/// enough that round-tripping multiple completions in one reminder stays well
44/// under the model's context budget but long enough that most successful runs
45/// don't need a follow-up `bash_status` call.
46const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50    pub task_id: String,
51    /// Intentionally omitted from serialized completion payloads: push frames
52    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
53    #[serde(skip_serializing)]
54    pub session_id: String,
55    pub status: BgTaskStatus,
56    pub exit_code: Option<i32>,
57    pub command: String,
58    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
59    /// cached so push-frame consumers and `bash_drain_completions` callers see
60    /// the same preview without racing against later output rotation. Empty
61    /// when not captured (e.g., persisted task seen on startup before buffer
62    /// reattachment).
63    #[serde(default, skip_serializing_if = "String::is_empty")]
64    pub output_preview: String,
65    /// True when the captured tail is shorter than the actual output (because
66    /// rotation occurred or the output exceeds the preview cap). Plugins use
67    /// this to render a `…` prefix and signal that `bash_status` would return
68    /// more.
69    #[serde(default, skip_serializing_if = "is_false")]
70    pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74    !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79    #[serde(flatten)]
80    pub info: BgTaskInfo,
81    pub exit_code: Option<i32>,
82    pub child_pid: Option<u32>,
83    pub workdir: String,
84    pub output_preview: String,
85    pub output_truncated: bool,
86    pub output_path: Option<String>,
87    pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92    pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98    pub(crate) progress_sender: SharedProgressSender,
99    watchdog_started: AtomicBool,
100    pub(crate) shutdown: AtomicBool,
101    pub(crate) long_running_reminder_enabled: AtomicBool,
102    pub(crate) long_running_reminder_interval_ms: AtomicU64,
103    /// Output compression callback. Set by `AppContext` after construction.
104    /// Takes (command, raw_output) and returns compressed text. Called from
105    /// the watchdog thread when a task reaches a terminal state and from
106    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
107    /// uncompressed.
108    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
109}
110
111pub(crate) struct BgTask {
112    pub(crate) task_id: String,
113    pub(crate) session_id: String,
114    pub(crate) paths: TaskPaths,
115    pub(crate) started: Instant,
116    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
117    pub(crate) terminal_at: Mutex<Option<Instant>>,
118    pub(crate) state: Mutex<BgTaskState>,
119}
120
121pub(crate) struct BgTaskState {
122    pub(crate) metadata: PersistedTask,
123    pub(crate) child: Option<Child>,
124    pub(crate) detached: bool,
125    pub(crate) buffer: BgBuffer,
126}
127
128impl BgTaskRegistry {
129    pub fn new(progress_sender: SharedProgressSender) -> Self {
130        Self {
131            inner: Arc::new(RegistryInner {
132                tasks: Mutex::new(HashMap::new()),
133                completions: Mutex::new(VecDeque::new()),
134                progress_sender,
135                watchdog_started: AtomicBool::new(false),
136                shutdown: AtomicBool::new(false),
137                long_running_reminder_enabled: AtomicBool::new(true),
138                long_running_reminder_interval_ms: AtomicU64::new(600_000),
139                compressor: Mutex::new(None),
140            }),
141        }
142    }
143
144    /// Install the output-compression callback. Called by `main.rs` after
145    /// `AppContext` is constructed so that snapshot/completion paths can
146    /// invoke `compress::compress_with_registry` without holding a context
147    /// reference. When called multiple times, the latest installation wins.
148    pub fn set_compressor<F>(&self, compressor: F)
149    where
150        F: Fn(&str, String) -> String + Send + Sync + 'static,
151    {
152        if let Ok(mut slot) = self.inner.compressor.lock() {
153            *slot = Some(Box::new(compressor));
154        }
155    }
156
157    /// Apply the installed compressor (if any) to `output`. Returns `output`
158    /// untouched when no compressor is installed.
159    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
160        let Ok(slot) = self.inner.compressor.lock() else {
161            return output;
162        };
163        match slot.as_ref() {
164            Some(compressor) => compressor(command, output),
165            None => output,
166        }
167    }
168
169    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
170        self.inner
171            .long_running_reminder_enabled
172            .store(enabled, Ordering::SeqCst);
173        self.inner
174            .long_running_reminder_interval_ms
175            .store(interval_ms, Ordering::SeqCst);
176    }
177
178    #[cfg(unix)]
179    #[allow(clippy::too_many_arguments)]
180    pub fn spawn(
181        &self,
182        command: &str,
183        session_id: String,
184        workdir: PathBuf,
185        env: HashMap<String, String>,
186        timeout: Option<Duration>,
187        storage_dir: PathBuf,
188        max_running: usize,
189        notify_on_completion: bool,
190        compressed: bool,
191    ) -> Result<String, String> {
192        self.start_watchdog();
193
194        let running = self.running_count();
195        if running >= max_running {
196            return Err(format!(
197                "background bash task limit exceeded: {running} running (max {max_running})"
198            ));
199        }
200
201        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
202        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
203        let task_id = self.generate_unique_task_id()?;
204        let paths = task_paths(&storage_dir, &session_id, &task_id);
205        fs::create_dir_all(&paths.dir)
206            .map_err(|e| format!("failed to create background task dir: {e}"))?;
207
208        let mut metadata = PersistedTask::starting(
209            task_id.clone(),
210            session_id.clone(),
211            command.to_string(),
212            workdir.clone(),
213            timeout_ms,
214            notify_on_completion,
215            compressed,
216        );
217        write_task(&paths.json, &metadata)
218            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
219
220        // Pre-create capture files so the watchdog/buffer can always
221        // open them for reading. The spawn helper opens its own handles
222        // per attempt because each `Command::spawn()` consumes them.
223        create_capture_file(&paths.stdout)
224            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
225        create_capture_file(&paths.stderr)
226            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
227
228        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
229
230        let child_pid = child.id();
231        metadata.mark_running(child_pid, child_pid as i32);
232        write_task(&paths.json, &metadata)
233            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
234
235        let task = Arc::new(BgTask {
236            task_id: task_id.clone(),
237            session_id,
238            paths: paths.clone(),
239            started: Instant::now(),
240            last_reminder_at: Mutex::new(None),
241            terminal_at: Mutex::new(None),
242            state: Mutex::new(BgTaskState {
243                metadata,
244                child: Some(child),
245                detached: false,
246                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
247            }),
248        });
249
250        self.inner
251            .tasks
252            .lock()
253            .map_err(|_| "background task registry lock poisoned".to_string())?
254            .insert(task_id.clone(), task);
255
256        Ok(task_id)
257    }
258
259    #[cfg(windows)]
260    #[allow(clippy::too_many_arguments)]
261    pub fn spawn(
262        &self,
263        command: &str,
264        session_id: String,
265        workdir: PathBuf,
266        env: HashMap<String, String>,
267        timeout: Option<Duration>,
268        storage_dir: PathBuf,
269        max_running: usize,
270        notify_on_completion: bool,
271        compressed: bool,
272    ) -> Result<String, String> {
273        self.start_watchdog();
274
275        let running = self.running_count();
276        if running >= max_running {
277            return Err(format!(
278                "background bash task limit exceeded: {running} running (max {max_running})"
279            ));
280        }
281
282        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
283        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
284        let task_id = self.generate_unique_task_id()?;
285        let paths = task_paths(&storage_dir, &session_id, &task_id);
286        fs::create_dir_all(&paths.dir)
287            .map_err(|e| format!("failed to create background task dir: {e}"))?;
288
289        let mut metadata = PersistedTask::starting(
290            task_id.clone(),
291            session_id.clone(),
292            command.to_string(),
293            workdir.clone(),
294            timeout_ms,
295            notify_on_completion,
296            compressed,
297        );
298        write_task(&paths.json, &metadata)
299            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
300
301        // Capture files are pre-created so the watchdog/buffer can always
302        // open them for reading even if the child hasn't written anything
303        // yet. The spawn helper opens its own handles per attempt because
304        // each `Command::spawn()` consumes them, and on Windows we may
305        // retry across multiple shell candidates if the first one fails.
306        create_capture_file(&paths.stdout)
307            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
308        create_capture_file(&paths.stderr)
309            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
310
311        let child = spawn_detached_child(command, &paths, &workdir, &env)?;
312
313        let child_pid = child.id();
314        metadata.status = BgTaskStatus::Running;
315        metadata.child_pid = Some(child_pid);
316        metadata.pgid = None;
317        write_task(&paths.json, &metadata)
318            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
319
320        let task = Arc::new(BgTask {
321            task_id: task_id.clone(),
322            session_id,
323            paths: paths.clone(),
324            started: Instant::now(),
325            last_reminder_at: Mutex::new(None),
326            terminal_at: Mutex::new(None),
327            state: Mutex::new(BgTaskState {
328                metadata,
329                child: Some(child),
330                detached: false,
331                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
332            }),
333        });
334
335        self.inner
336            .tasks
337            .lock()
338            .map_err(|_| "background task registry lock poisoned".to_string())?
339            .insert(task_id.clone(), task);
340
341        Ok(task_id)
342    }
343
344    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
345        self.start_watchdog();
346        let dir = session_tasks_dir(storage_dir, session_id);
347        if !dir.exists() {
348            return Ok(());
349        }
350
351        let entries = fs::read_dir(&dir)
352            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
353        for entry in entries.flatten() {
354            let path = entry.path();
355            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
356                continue;
357            }
358            let Ok(mut metadata) = read_task(&path) else {
359                continue;
360            };
361            if metadata.session_id != session_id {
362                continue;
363            }
364
365            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
366            match metadata.status {
367                BgTaskStatus::Starting => {
368                    metadata.mark_terminal(
369                        BgTaskStatus::Failed,
370                        None,
371                        Some("spawn aborted".to_string()),
372                    );
373                    let _ = write_task(&paths.json, &metadata);
374                    self.enqueue_completion_if_needed(&metadata, false);
375                }
376                BgTaskStatus::Running => {
377                    if self.running_metadata_is_stale(&metadata) {
378                        metadata.mark_terminal(
379                            BgTaskStatus::Killed,
380                            None,
381                            Some("orphaned (>24h)".to_string()),
382                        );
383                        if !paths.exit.exists() {
384                            let _ = write_kill_marker_if_absent(&paths.exit);
385                        }
386                        let _ = write_task(&paths.json, &metadata);
387                        self.enqueue_completion_if_needed(&metadata, false);
388                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
389                        metadata = terminal_metadata_from_marker(metadata, marker, None);
390                        let _ = write_task(&paths.json, &metadata);
391                        self.enqueue_completion_if_needed(&metadata, false);
392                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
393                        metadata.mark_terminal(
394                            BgTaskStatus::Failed,
395                            None,
396                            Some("process exited without exit marker".to_string()),
397                        );
398                        let _ = write_task(&paths.json, &metadata);
399                        self.enqueue_completion_if_needed(&metadata, false);
400                    } else {
401                        self.insert_rehydrated_task(metadata, paths, true)?;
402                    }
403                }
404                _ if metadata.status.is_terminal() => {
405                    self.insert_rehydrated_task(metadata.clone(), paths, true)?;
406                    self.enqueue_completion_if_needed(&metadata, false);
407                }
408                _ => {}
409            }
410        }
411
412        Ok(())
413    }
414
415    pub fn status(
416        &self,
417        task_id: &str,
418        session_id: &str,
419        preview_bytes: usize,
420    ) -> Option<BgTaskSnapshot> {
421        let task = self.task_for_session(task_id, session_id)?;
422        let _ = self.poll_task(&task);
423        let mut snapshot = task.snapshot(preview_bytes);
424        self.maybe_compress_snapshot(&task, &mut snapshot);
425        Some(snapshot)
426    }
427
428    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
429        let tasks = self
430            .inner
431            .tasks
432            .lock()
433            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
434            .unwrap_or_default();
435        tasks
436            .into_iter()
437            .map(|task| {
438                let _ = self.poll_task(&task);
439                let mut snapshot = task.snapshot(preview_bytes);
440                self.maybe_compress_snapshot(&task, &mut snapshot);
441                snapshot
442            })
443            .collect()
444    }
445
446    /// Compress `output_preview` in place when the task is in a terminal
447    /// state. Live tail of running tasks stays raw so agents debugging
448    /// long-running bash see exactly what the process emitted, not a
449    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
450    /// field on `PersistedTask` short-circuits before the compress pipeline.
451    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
452        if !snapshot.info.status.is_terminal() {
453            return;
454        }
455        let compressed_flag = task
456            .state
457            .lock()
458            .map(|state| state.metadata.compressed)
459            .unwrap_or(true);
460        if !compressed_flag {
461            return;
462        }
463        let raw = std::mem::take(&mut snapshot.output_preview);
464        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
465    }
466
467    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
468        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
469    }
470
471    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
472        let task = self
473            .task_for_session(task_id, session_id)
474            .ok_or_else(|| format!("background task not found: {task_id}"))?;
475        let mut state = task
476            .state
477            .lock()
478            .map_err(|_| "background task lock poisoned".to_string())?;
479        let updated = update_task(&task.paths.json, |metadata| {
480            metadata.notify_on_completion = true;
481            metadata.completion_delivered = false;
482        })
483        .map_err(|e| format!("failed to promote background task: {e}"))?;
484        state.metadata = updated;
485        if state.metadata.status.is_terminal() {
486            state.buffer.enforce_terminal_cap();
487            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
488        }
489        Ok(true)
490    }
491
492    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
493        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
494            .map(|_| ())
495    }
496
497    pub fn cleanup_finished(&self, older_than: Duration) {
498        let cutoff = Instant::now().checked_sub(older_than);
499        if let Ok(mut tasks) = self.inner.tasks.lock() {
500            tasks.retain(|_, task| {
501                let is_terminal = task
502                    .state
503                    .lock()
504                    .map(|state| state.metadata.status.is_terminal())
505                    .unwrap_or(false);
506                if !is_terminal {
507                    return true;
508                }
509
510                let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
511                match (terminal_at, cutoff) {
512                    (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
513                    (Some(_), None) => false,
514                    (None, _) => true,
515                }
516            });
517        }
518    }
519
520    pub fn drain_completions(&self) -> Vec<BgCompletion> {
521        self.drain_completions_for_session(None)
522    }
523
524    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
525        let mut completions = match self.inner.completions.lock() {
526            Ok(completions) => completions,
527            Err(_) => return Vec::new(),
528        };
529
530        let drained = if let Some(session_id) = session_id {
531            let mut matched = Vec::new();
532            let mut retained = VecDeque::new();
533            while let Some(completion) = completions.pop_front() {
534                if completion.session_id == session_id {
535                    matched.push(completion);
536                } else {
537                    retained.push_back(completion);
538                }
539            }
540            *completions = retained;
541            matched
542        } else {
543            completions.drain(..).collect()
544        };
545        drop(completions);
546
547        for completion in &drained {
548            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
549                let _ = task.set_completion_delivered(true);
550            }
551        }
552
553        drained
554    }
555
556    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
557        self.inner
558            .completions
559            .lock()
560            .map(|completions| {
561                completions
562                    .iter()
563                    .filter(|completion| completion.session_id == session_id)
564                    .cloned()
565                    .collect()
566            })
567            .unwrap_or_default()
568    }
569
570    pub fn detach(&self) {
571        self.inner.shutdown.store(true, Ordering::SeqCst);
572        if let Ok(mut tasks) = self.inner.tasks.lock() {
573            for task in tasks.values() {
574                if let Ok(mut state) = task.state.lock() {
575                    state.child = None;
576                    state.detached = true;
577                }
578            }
579            tasks.clear();
580        }
581    }
582
583    pub fn shutdown(&self) {
584        let tasks = self
585            .inner
586            .tasks
587            .lock()
588            .map(|tasks| {
589                tasks
590                    .values()
591                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
592                    .collect::<Vec<_>>()
593            })
594            .unwrap_or_default();
595        for (task_id, session_id) in tasks {
596            let _ = self.kill(&task_id, &session_id);
597        }
598    }
599
600    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
601        let marker = match read_exit_marker(&task.paths.exit) {
602            Ok(Some(marker)) => marker,
603            Ok(None) => return Ok(()),
604            Err(error) => return Err(format!("failed to read exit marker: {error}")),
605        };
606        self.finalize_from_marker(task, marker, None)
607    }
608
609    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
610        let Ok(mut state) = task.state.lock() else {
611            return;
612        };
613        if let Some(child) = state.child.as_mut() {
614            if matches!(child.try_wait(), Ok(Some(_))) {
615                // Child has exited. If the wrapper successfully wrote an
616                // exit marker, the next `poll_task()` cycle will pick it up
617                // and finalize via `finalize_from_marker`. But if the
618                // wrapper crashed before writing the marker (e.g. SIGKILL,
619                // power loss, wrapper bug), the task would forever appear
620                // Running until `timeout_ms` expired — and if no timeout
621                // was set, until the 24h `running_metadata_is_stale` cutoff
622                // hit at the next aft restart. Same condition as the replay
623                // path's "PID dead but no marker" branch (see line 338).
624                //
625                // To avoid that hidden hang, mark the task Failed
626                // immediately with the same reason string used by replay,
627                // but only if the marker is genuinely absent. If a marker
628                // appeared on disk between try_wait() returning and now
629                // (race window), prefer the marker — let the next poll
630                // cycle finalize from it.
631                state.child = None;
632                state.detached = true;
633                if state.metadata.status.is_terminal() {
634                    return;
635                }
636                if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
637                    return;
638                }
639                let updated = update_task(&task.paths.json, |metadata| {
640                    metadata.mark_terminal(
641                        BgTaskStatus::Failed,
642                        None,
643                        Some("process exited without exit marker".to_string()),
644                    );
645                });
646                if let Ok(metadata) = updated {
647                    state.metadata = metadata;
648                    task.mark_terminal_now();
649                    state.buffer.enforce_terminal_cap();
650                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
651                }
652            }
653        }
654    }
655
656    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
657        self.inner
658            .tasks
659            .lock()
660            .map(|tasks| {
661                tasks
662                    .values()
663                    .filter(|task| task.is_running())
664                    .cloned()
665                    .collect()
666            })
667            .unwrap_or_default()
668    }
669
670    fn insert_rehydrated_task(
671        &self,
672        metadata: PersistedTask,
673        paths: TaskPaths,
674        detached: bool,
675    ) -> Result<(), String> {
676        let task_id = metadata.task_id.clone();
677        let session_id = metadata.session_id.clone();
678        let task = Arc::new(BgTask {
679            task_id: task_id.clone(),
680            session_id,
681            paths: paths.clone(),
682            started: Instant::now(),
683            last_reminder_at: Mutex::new(None),
684            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
685            state: Mutex::new(BgTaskState {
686                metadata,
687                child: None,
688                detached,
689                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
690            }),
691        });
692        self.inner
693            .tasks
694            .lock()
695            .map_err(|_| "background task registry lock poisoned".to_string())?
696            .insert(task_id, task);
697        Ok(())
698    }
699
700    fn kill_with_status(
701        &self,
702        task_id: &str,
703        session_id: &str,
704        terminal_status: BgTaskStatus,
705    ) -> Result<BgTaskSnapshot, String> {
706        let task = self
707            .task_for_session(task_id, session_id)
708            .ok_or_else(|| format!("background task not found: {task_id}"))?;
709
710        {
711            let mut state = task
712                .state
713                .lock()
714                .map_err(|_| "background task lock poisoned".to_string())?;
715            if state.metadata.status.is_terminal() {
716                return Ok(task.snapshot_locked(&state, 5 * 1024));
717            }
718
719            state.metadata.status = BgTaskStatus::Killing;
720            write_task(&task.paths.json, &state.metadata)
721                .map_err(|e| format!("failed to persist killing state: {e}"))?;
722
723            #[cfg(unix)]
724            if let Some(pgid) = state.metadata.pgid {
725                terminate_pgid(pgid, state.child.as_mut());
726            }
727            #[cfg(windows)]
728            if let Some(child) = state.child.as_mut() {
729                super::process::terminate_process(child);
730            } else if let Some(pid) = state.metadata.child_pid {
731                terminate_pid(pid);
732            }
733            if let Some(child) = state.child.as_mut() {
734                let _ = child.wait();
735            }
736            state.child = None;
737            state.detached = true;
738
739            if !task.paths.exit.exists() {
740                write_kill_marker_if_absent(&task.paths.exit)
741                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
742            }
743
744            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
745                Some(124)
746            } else {
747                None
748            };
749            state
750                .metadata
751                .mark_terminal(terminal_status, exit_code, None);
752            task.mark_terminal_now();
753            write_task(&task.paths.json, &state.metadata)
754                .map_err(|e| format!("failed to persist killed state: {e}"))?;
755            state.buffer.enforce_terminal_cap();
756            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
757        }
758
759        Ok(task.snapshot(5 * 1024))
760    }
761
762    fn finalize_from_marker(
763        &self,
764        task: &Arc<BgTask>,
765        marker: ExitMarker,
766        reason: Option<String>,
767    ) -> Result<(), String> {
768        let mut state = task
769            .state
770            .lock()
771            .map_err(|_| "background task lock poisoned".to_string())?;
772        if state.metadata.status.is_terminal() {
773            return Ok(());
774        }
775
776        let updated = update_task(&task.paths.json, |metadata| {
777            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
778            *metadata = new_metadata;
779        })
780        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
781        state.metadata = updated;
782        task.mark_terminal_now();
783        state.child = None;
784        state.detached = true;
785        state.buffer.enforce_terminal_cap();
786        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
787        Ok(())
788    }
789
790    fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
791        if metadata.status.is_terminal() && !metadata.completion_delivered {
792            self.enqueue_completion_locked(metadata, None, emit_frame);
793        }
794    }
795
796    fn enqueue_completion_locked(
797        &self,
798        metadata: &PersistedTask,
799        buffer: Option<&BgBuffer>,
800        emit_frame: bool,
801    ) {
802        if !metadata.status.is_terminal() || metadata.completion_delivered {
803            return;
804        }
805        // Read tail once at completion time and cache on the BgCompletion so
806        // both the push-frame consumer (running session) and any later
807        // `bash_drain_completions` poll (different session, restart) see the
808        // same preview without racing against rotation.
809        let (raw_preview, output_truncated) = match buffer {
810            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
811            None => (String::new(), false),
812        };
813        // Compress at completion time so push-frame consumers and later
814        // `bash_drain_completions` poll-callers see the same compressed text.
815        // Per-task `compressed: false` opts out; otherwise the compressor is
816        // a no-op when `experimental.bash.compress=false`.
817        let output_preview = if metadata.compressed {
818            self.compress_output(&metadata.command, raw_preview)
819        } else {
820            raw_preview
821        };
822        let completion = BgCompletion {
823            task_id: metadata.task_id.clone(),
824            session_id: metadata.session_id.clone(),
825            status: metadata.status.clone(),
826            exit_code: metadata.exit_code,
827            command: metadata.command.clone(),
828            output_preview,
829            output_truncated,
830        };
831        if let Ok(mut completions) = self.inner.completions.lock() {
832            if completions
833                .iter()
834                .any(|completion| completion.task_id == metadata.task_id)
835            {
836                return;
837            }
838            completions.push_back(completion.clone());
839        } else {
840            return;
841        }
842
843        if emit_frame {
844            self.emit_bash_completed(completion);
845        }
846    }
847
848    fn emit_bash_completed(&self, completion: BgCompletion) {
849        let Ok(progress_sender) = self
850            .inner
851            .progress_sender
852            .lock()
853            .map(|sender| sender.clone())
854        else {
855            return;
856        };
857        let Some(sender) = progress_sender.as_ref() else {
858            return;
859        };
860        // Clone the callback out of the registry mutex before writing to stdout;
861        // otherwise a blocked push-frame write could pin the mutex and starve
862        // unrelated progress-sender updates.
863        // Bg task transitions are discovered by the watchdog thread, so the
864        // sender is shared behind a Mutex. It still uses the same stdout writer
865        // closure as foreground progress frames, preserving the existing lock/
866        // flush behavior in main.rs.
867        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
868            completion.task_id,
869            completion.session_id,
870            completion.status,
871            completion.exit_code,
872            completion.command,
873            completion.output_preview,
874            completion.output_truncated,
875        )));
876    }
877
878    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
879        if !self
880            .inner
881            .long_running_reminder_enabled
882            .load(Ordering::SeqCst)
883        {
884            return;
885        }
886        let interval_ms = self
887            .inner
888            .long_running_reminder_interval_ms
889            .load(Ordering::SeqCst);
890        if interval_ms == 0 {
891            return;
892        }
893        let interval = Duration::from_millis(interval_ms);
894        let now = Instant::now();
895        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
896            return;
897        };
898        let since = last_reminder_at.unwrap_or(task.started);
899        if now.duration_since(since) < interval {
900            return;
901        }
902        let command = task
903            .state
904            .lock()
905            .map(|state| state.metadata.command.clone())
906            .unwrap_or_default();
907        *last_reminder_at = Some(now);
908        self.emit_bash_long_running(BashLongRunningFrame::new(
909            task.task_id.clone(),
910            task.session_id.clone(),
911            command,
912            task.started.elapsed().as_millis() as u64,
913        ));
914    }
915
916    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
917        let Ok(progress_sender) = self
918            .inner
919            .progress_sender
920            .lock()
921            .map(|sender| sender.clone())
922        else {
923            return;
924        };
925        if let Some(sender) = progress_sender.as_ref() {
926            sender(PushFrame::BashLongRunning(frame));
927        }
928    }
929
930    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
931        self.inner
932            .tasks
933            .lock()
934            .ok()
935            .and_then(|tasks| tasks.get(task_id).cloned())
936    }
937
938    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
939        self.task(task_id)
940            .filter(|task| task.session_id == session_id)
941    }
942
943    fn running_count(&self) -> usize {
944        self.inner
945            .tasks
946            .lock()
947            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
948            .unwrap_or(0)
949    }
950
951    fn start_watchdog(&self) {
952        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
953            super::watchdog::start(self.clone());
954        }
955    }
956
957    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
958        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
959    }
960
961    #[cfg(test)]
962    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
963        self.task_for_session(task_id, session_id)
964            .map(|task| task.paths.json.clone())
965    }
966
967    #[cfg(test)]
968    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
969        self.task_for_session(task_id, session_id)
970            .map(|task| task.paths.exit.clone())
971    }
972
973    /// Generate a `bgb-{16hex}` slug that is unique against live tasks and queued completions.
974    fn generate_unique_task_id(&self) -> Result<String, String> {
975        for _ in 0..32 {
976            let candidate = random_slug();
977            let tasks = self
978                .inner
979                .tasks
980                .lock()
981                .map_err(|_| "background task registry lock poisoned".to_string())?;
982            if tasks.contains_key(&candidate) {
983                continue;
984            }
985            let completions = self
986                .inner
987                .completions
988                .lock()
989                .map_err(|_| "background completions lock poisoned".to_string())?;
990            if completions
991                .iter()
992                .any(|completion| completion.task_id == candidate)
993            {
994                continue;
995            }
996            return Ok(candidate);
997        }
998        Err("failed to allocate unique background task id after 32 attempts".to_string())
999    }
1000}
1001
1002impl Default for BgTaskRegistry {
1003    fn default() -> Self {
1004        Self::new(Arc::new(Mutex::new(None)))
1005    }
1006}
1007
1008impl BgTask {
1009    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1010        let state = self
1011            .state
1012            .lock()
1013            .unwrap_or_else(|poison| poison.into_inner());
1014        self.snapshot_locked(&state, preview_bytes)
1015    }
1016
1017    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1018        let metadata = &state.metadata;
1019        let duration_ms = metadata.duration_ms.or_else(|| {
1020            metadata
1021                .status
1022                .is_terminal()
1023                .then(|| self.started.elapsed().as_millis() as u64)
1024        });
1025        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1026        BgTaskSnapshot {
1027            info: BgTaskInfo {
1028                task_id: self.task_id.clone(),
1029                status: metadata.status.clone(),
1030                command: metadata.command.clone(),
1031                started_at: metadata.started_at,
1032                duration_ms,
1033            },
1034            exit_code: metadata.exit_code,
1035            child_pid: metadata.child_pid,
1036            workdir: metadata.workdir.display().to_string(),
1037            output_preview,
1038            output_truncated,
1039            output_path: state
1040                .buffer
1041                .output_path()
1042                .map(|path| path.display().to_string()),
1043            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1044        }
1045    }
1046
1047    pub(crate) fn is_running(&self) -> bool {
1048        self.state
1049            .lock()
1050            .map(|state| state.metadata.status == BgTaskStatus::Running)
1051            .unwrap_or(false)
1052    }
1053
1054    fn mark_terminal_now(&self) {
1055        if let Ok(mut terminal_at) = self.terminal_at.lock() {
1056            if terminal_at.is_none() {
1057                *terminal_at = Some(Instant::now());
1058            }
1059        }
1060    }
1061
1062    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1063        let mut state = self
1064            .state
1065            .lock()
1066            .map_err(|_| "background task lock poisoned".to_string())?;
1067        let updated = update_task(&self.paths.json, |metadata| {
1068            metadata.completion_delivered = delivered;
1069        })
1070        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1071        state.metadata = updated;
1072        Ok(())
1073    }
1074}
1075
1076fn terminal_metadata_from_marker(
1077    mut metadata: PersistedTask,
1078    marker: ExitMarker,
1079    reason: Option<String>,
1080) -> PersistedTask {
1081    match marker {
1082        ExitMarker::Code(code) => {
1083            let status = if code == 0 {
1084                BgTaskStatus::Completed
1085            } else {
1086                BgTaskStatus::Failed
1087            };
1088            metadata.mark_terminal(status, Some(code), reason);
1089        }
1090        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1091    }
1092    metadata
1093}
1094
1095#[cfg(unix)]
1096fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1097    let mut cmd = Command::new("/bin/sh");
1098    cmd.arg("-c")
1099        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1100        .arg("/bin/sh")
1101        .arg(command)
1102        .arg(exit_path);
1103    unsafe {
1104        cmd.pre_exec(|| {
1105            if libc::setsid() == -1 {
1106                return Err(std::io::Error::last_os_error());
1107            }
1108            Ok(())
1109        });
1110    }
1111    cmd
1112}
1113
1114#[cfg(windows)]
1115fn detached_shell_command_for(
1116    shell: crate::windows_shell::WindowsShell,
1117    command: &str,
1118    exit_path: &Path,
1119    paths: &TaskPaths,
1120    creation_flags: u32,
1121) -> Result<Command, String> {
1122    use crate::windows_shell::WindowsShell;
1123    // Write the wrapper to a temp file alongside the other task files,
1124    // then invoke the shell with the file path as a single clean
1125    // argument. This sidesteps the entire Windows command-line quoting
1126    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
1127    // parser all interacting with embedded quotes in the wrapper).
1128    //
1129    // Path arguments don't need quoting in the same problematic way
1130    // because: (1) we use no-space task IDs (bgb-XXXXXXXX) so the path
1131    // contains no characters that need shell escaping; (2) the wrapper
1132    // body's internal quotes never reach the shell command line — the
1133    // shell reads them from disk by file syntax rules, not command-line
1134    // parser rules.
1135    let wrapper_body = shell.wrapper_script(command, exit_path);
1136    let wrapper_ext = match shell {
1137        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1138        WindowsShell::Cmd => "bat",
1139        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
1140        // so the file extension is purely cosmetic; `.sh` matches what an
1141        // operator would expect when grepping the spill directory.
1142        WindowsShell::Posix(_) => "sh",
1143    };
1144    let wrapper_path = paths.dir.join(format!(
1145        "{}.{}",
1146        paths
1147            .json
1148            .file_stem()
1149            .and_then(|s| s.to_str())
1150            .unwrap_or("wrapper"),
1151        wrapper_ext
1152    ));
1153    fs::write(&wrapper_path, wrapper_body)
1154        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1155
1156    let mut cmd = Command::new(shell.binary().as_ref());
1157    match shell {
1158        WindowsShell::Pwsh | WindowsShell::Powershell => {
1159            // -File runs the script with no quoting issues. `-NoLogo`,
1160            // `-NoProfile`, etc. apply to the host before the file runs.
1161            cmd.args([
1162                "-NoLogo",
1163                "-NoProfile",
1164                "-NonInteractive",
1165                "-ExecutionPolicy",
1166                "Bypass",
1167                "-File",
1168            ]);
1169            cmd.arg(&wrapper_path);
1170        }
1171        WindowsShell::Cmd => {
1172            // `cmd /V:ON /D /C "<bat-file-path>"` — invoking a .bat
1173            // file via /C is well-defined; the file's contents are
1174            // read line-by-line by cmd's batch processor, NOT
1175            // re-interpreted by the /C parser. This avoids the
1176            // "filename syntax incorrect" errors that came from
1177            // having complex compound commands on the cmd line.
1178            cmd.args(["/V:ON", "/D", "/C"]);
1179            cmd.arg(&wrapper_path);
1180        }
1181        WindowsShell::Posix(_) => {
1182            // git-bash and other POSIX shells run the wrapper script with
1183            // `<binary> <wrapper-path>` (the wrapper is just a shell
1184            // script). No special flags needed — the `trap` and atomic
1185            // exit-marker rename in `wrapper_script` are POSIX-standard.
1186            cmd.arg(&wrapper_path);
1187        }
1188    }
1189
1190    // Win32 process creation flags. Caller selects whether to include
1191    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
1192    // for the breakaway-fallback strategy.
1193    cmd.creation_flags(creation_flags);
1194    Ok(cmd)
1195}
1196
1197/// Spawn a detached background bash child process.
1198///
1199/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1200/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1201/// cmd.exe) and retries with the next candidate when the previous one
1202/// fails to spawn with `NotFound` — the same runtime safety net the
1203/// foreground bash path has, so issue #27 callers landing on cmd.exe
1204/// fallback can also use background bash. The wrapper script is
1205/// regenerated per attempt because PowerShell wrappers embed the shell
1206/// binary by name; the stdout/stderr capture handles are also reopened
1207/// per attempt because `Command::spawn()` consumes them.
1208///
1209/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1210/// return immediately without retry — they indicate a problem with the
1211/// resolved shell that retrying with a different shell won't fix.
1212fn spawn_detached_child(
1213    command: &str,
1214    paths: &TaskPaths,
1215    workdir: &Path,
1216    env: &HashMap<String, String>,
1217) -> Result<std::process::Child, String> {
1218    #[cfg(not(windows))]
1219    {
1220        let stdout = create_capture_file(&paths.stdout)
1221            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1222        let stderr = create_capture_file(&paths.stderr)
1223            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1224        detached_shell_command(command, &paths.exit)
1225            .current_dir(workdir)
1226            .envs(env)
1227            .stdin(Stdio::null())
1228            .stdout(Stdio::from(stdout))
1229            .stderr(Stdio::from(stderr))
1230            .spawn()
1231            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1232    }
1233    #[cfg(windows)]
1234    {
1235        use crate::windows_shell::shell_candidates;
1236        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
1237        // legacy foreground bash spawn path. v0.20 routes ALL bash through
1238        // this background spawn helper, including foreground tool calls
1239        // where the model writes PowerShell-syntax (`$var = ...`,
1240        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
1241        // The earlier v0.18-era cmd-first override worked around a
1242        // PowerShell detached-output bug; that bug is fixed at the
1243        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
1244        // see flag block below), so we no longer need to misroute PS
1245        // commands through cmd.
1246        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1247        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
1248        // first (so the bg child outlives the AFT process when AFT is killed),
1249        // then fall back without it for environments where the parent is in a
1250        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
1251        // runners (GitHub Actions windows-2022) and some MDM-managed corp
1252        // environments hit this — `CreateProcess` returns Access Denied (5).
1253        // Without breakaway, the child still runs detached but will be torn
1254        // down with the parent if the parent process group is signaled.
1255        //
1256        // We use CREATE_NO_WINDOW (no visible console window, but the
1257        // child still has a hidden console) rather than DETACHED_PROCESS
1258        // (no console at all). PowerShell-based wrappers that perform
1259        // file I/O via [System.IO.File] need a console handle to flush
1260        // stdout/stderr correctly even when redirected — under
1261        // DETACHED_PROCESS, pwsh sometimes silently exits before
1262        // executing later script statements (the Move-Item that writes
1263        // the exit marker never runs), leaving the bg task forever
1264        // marked Failed: process exited without exit marker. cmd.exe
1265        // wrappers tolerate DETACHED_PROCESS, but switching to
1266        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
1267        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1268        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1269        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1270        let with_breakaway =
1271            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1272        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1273        let mut last_error: Option<String> = None;
1274        for (idx, shell) in candidates.iter().enumerate() {
1275            // Per-shell, try with breakaway first. If the process is in a
1276            // restrictive job, the breakaway flag triggers Access Denied
1277            // (os error 5). Retry once without breakaway.
1278            for &flags in &[with_breakaway, without_breakaway] {
1279                // Re-open capture handles per attempt; spawn() consumes them.
1280                let stdout = create_capture_file(&paths.stdout)
1281                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1282                let stderr = create_capture_file(&paths.stderr)
1283                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1284                let mut cmd =
1285                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1286                cmd.current_dir(workdir)
1287                    .envs(env)
1288                    .stdin(Stdio::null())
1289                    .stdout(Stdio::from(stdout))
1290                    .stderr(Stdio::from(stderr));
1291                match cmd.spawn() {
1292                    Ok(child) => {
1293                        if idx > 0 {
1294                            log::warn!(
1295                                "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1296                                 the cached PATH probe disagreed with runtime spawn — likely PATH \
1297                                 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1298                                shell.binary(),
1299                                idx
1300                            );
1301                        }
1302                        if flags == without_breakaway {
1303                            log::warn!(
1304                                "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1305                                 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1306                                 Spawned without breakaway; the bg task will be torn down if the \
1307                                 AFT process group is killed."
1308                            );
1309                        }
1310                        return Ok(child);
1311                    }
1312                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1313                        log::warn!(
1314                            "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1315                            shell.binary()
1316                        );
1317                        last_error = Some(format!("{}: {e}", shell.binary()));
1318                        // Skip the without-breakaway retry for NotFound — the
1319                        // binary itself is missing, breakaway flag is irrelevant.
1320                        break;
1321                    }
1322                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1323                        // Access Denied during breakaway — retry without it.
1324                        log::warn!(
1325                            "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1326                             Access Denied — retrying {} without breakaway",
1327                            shell.binary()
1328                        );
1329                        last_error = Some(format!("{}: {e}", shell.binary()));
1330                        continue;
1331                    }
1332                    Err(e) => {
1333                        return Err(format!(
1334                            "failed to spawn background bash command via {}: {e}",
1335                            shell.binary()
1336                        ));
1337                    }
1338                }
1339            }
1340        }
1341        Err(format!(
1342            "failed to spawn background bash command: no Windows shell could be spawned. \
1343             Last error: {}. PATH-probed candidates: {:?}",
1344            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1345            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1346        ))
1347    }
1348}
1349
1350fn random_slug() -> String {
1351    let mut bytes = [0u8; 4];
1352    // getrandom is a transitive dependency; use it directly for OS entropy.
1353    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1354        // Extremely unlikely fallback: time + pid mix.
1355        let t = SystemTime::now()
1356            .duration_since(UNIX_EPOCH)
1357            .map(|d| d.subsec_nanos())
1358            .unwrap_or(0);
1359        let p = std::process::id();
1360        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1361    });
1362    // `bgb-` + 8 lowercase hex chars — compact, OS-entropy backed.
1363    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1364    format!("bgb-{hex}")
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369    use std::collections::HashMap;
1370    #[cfg(windows)]
1371    use std::fs;
1372    use std::sync::{Arc, Mutex};
1373    use std::time::Duration;
1374    #[cfg(windows)]
1375    use std::time::Instant;
1376
1377    use super::*;
1378
1379    #[cfg(unix)]
1380    const QUICK_SUCCESS_COMMAND: &str = "true";
1381    #[cfg(windows)]
1382    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1383
1384    #[cfg(unix)]
1385    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1386    #[cfg(windows)]
1387    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1388
1389    #[test]
1390    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1391        let registry = BgTaskRegistry::default();
1392        let dir = tempfile::tempdir().unwrap();
1393        let task_id = registry
1394            .spawn(
1395                QUICK_SUCCESS_COMMAND,
1396                "session".to_string(),
1397                dir.path().to_path_buf(),
1398                HashMap::new(),
1399                Some(Duration::from_secs(30)),
1400                dir.path().to_path_buf(),
1401                10,
1402                true,
1403                false,
1404            )
1405            .unwrap();
1406        registry
1407            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1408            .unwrap();
1409
1410        registry.cleanup_finished(Duration::ZERO);
1411
1412        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1413    }
1414
1415    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1416    /// watchdog path (reap_child) marks a task Failed when the child
1417    /// has exited but no exit marker was written. Before this fix the
1418    /// task would remain `Running` until timeout, even though the
1419    /// process was definitely dead.
1420    ///
1421    /// Cross-platform: uses a quick-exiting command that does NOT go
1422    /// through the wrapper script (we manually clear the exit marker
1423    /// after spawn to simulate the wrapper crashing before write).
1424    #[test]
1425    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1426        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1427        let dir = tempfile::tempdir().unwrap();
1428        let task_id = registry
1429            .spawn(
1430                QUICK_SUCCESS_COMMAND,
1431                "session".to_string(),
1432                dir.path().to_path_buf(),
1433                HashMap::new(),
1434                Some(Duration::from_secs(30)),
1435                dir.path().to_path_buf(),
1436                10,
1437                true,
1438                false,
1439            )
1440            .unwrap();
1441
1442        let task = registry.task_for_session(&task_id, "session").unwrap();
1443
1444        // Wait for the child to actually exit and the wrapper to either
1445        // write the marker or fail. Then nuke the marker to simulate
1446        // wrapper crash before write. Poll up to 5s; this is plenty for a
1447        // `true`/`cmd /c exit 0` invocation.
1448        let started = Instant::now();
1449        loop {
1450            let exited = {
1451                let mut state = task.state.lock().unwrap();
1452                if let Some(child) = state.child.as_mut() {
1453                    matches!(child.try_wait(), Ok(Some(_)))
1454                } else {
1455                    true
1456                }
1457            };
1458            if exited {
1459                break;
1460            }
1461            assert!(
1462                started.elapsed() < Duration::from_secs(5),
1463                "child should exit quickly"
1464            );
1465            std::thread::sleep(Duration::from_millis(20));
1466        }
1467
1468        // Wrapper likely wrote the marker by now; remove it to simulate
1469        // a wrapper crash that exited before persisting the exit code.
1470        let _ = std::fs::remove_file(&task.paths.exit);
1471
1472        // Sanity: task is still Running per metadata (replay/poll hasn't
1473        // observed the missing marker yet).
1474        assert!(
1475            task.is_running(),
1476            "precondition: metadata.status == Running"
1477        );
1478        assert!(
1479            !task.paths.exit.exists(),
1480            "precondition: exit marker absent"
1481        );
1482
1483        // Invoke the watchdog's reap_child directly. The fix should mark
1484        // the task Failed with the documented reason string, instead of
1485        // just dropping the child handle and leaving status=Running.
1486        registry.reap_child(&task);
1487
1488        let state = task.state.lock().unwrap();
1489        assert!(
1490            state.metadata.status.is_terminal(),
1491            "reap_child must transition to terminal when PID dead and no marker. \
1492             Got status={:?}",
1493            state.metadata.status
1494        );
1495        assert_eq!(
1496            state.metadata.status,
1497            BgTaskStatus::Failed,
1498            "must specifically be Failed (not Killed): status={:?}",
1499            state.metadata.status
1500        );
1501        assert_eq!(
1502            state.metadata.status_reason.as_deref(),
1503            Some("process exited without exit marker"),
1504            "reason must match replay path's wording: {:?}",
1505            state.metadata.status_reason
1506        );
1507        assert!(
1508            state.child.is_none(),
1509            "child handle must be released after reap"
1510        );
1511        assert!(state.detached, "task must be marked detached after reap");
1512    }
1513
1514    /// Companion to the above: when the exit marker DOES exist on disk
1515    /// at reap_child time (race window — wrapper finished writing
1516    /// between try_wait and the marker check), reap_child must NOT mark
1517    /// the task Failed. Instead it leaves status=Running and lets the
1518    /// next poll_task() cycle finalize via the marker.
1519    #[test]
1520    fn reap_child_preserves_running_when_exit_marker_exists() {
1521        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1522        let dir = tempfile::tempdir().unwrap();
1523        let task_id = registry
1524            .spawn(
1525                QUICK_SUCCESS_COMMAND,
1526                "session".to_string(),
1527                dir.path().to_path_buf(),
1528                HashMap::new(),
1529                Some(Duration::from_secs(30)),
1530                dir.path().to_path_buf(),
1531                10,
1532                true,
1533                false,
1534            )
1535            .unwrap();
1536
1537        let task = registry.task_for_session(&task_id, "session").unwrap();
1538
1539        // Wait for child to exit AND for the marker to land. Both happen
1540        // shortly after the wrapper finishes — but we want both observed.
1541        let started = Instant::now();
1542        loop {
1543            let exited = {
1544                let mut state = task.state.lock().unwrap();
1545                if let Some(child) = state.child.as_mut() {
1546                    matches!(child.try_wait(), Ok(Some(_)))
1547                } else {
1548                    true
1549                }
1550            };
1551            if exited && task.paths.exit.exists() {
1552                break;
1553            }
1554            assert!(
1555                started.elapsed() < Duration::from_secs(5),
1556                "child should exit and write marker quickly"
1557            );
1558            std::thread::sleep(Duration::from_millis(20));
1559        }
1560
1561        // reap_child sees: child exited, marker exists. It should:
1562        //  - drop state.child / set state.detached = true
1563        //  - NOT change status (poll_task will finalize via marker next tick)
1564        registry.reap_child(&task);
1565
1566        let state = task.state.lock().unwrap();
1567        assert!(
1568            state.child.is_none(),
1569            "child handle still released even when marker exists"
1570        );
1571        assert!(
1572            state.detached,
1573            "task still marked detached even when marker exists"
1574        );
1575        // Status remains Running because reap_child defers to poll_task
1576        // when a marker exists. It would be wrong for reap to record the
1577        // marker outcome (poll_task does that with proper exit-code
1578        // parsing).
1579        assert_eq!(
1580            state.metadata.status,
1581            BgTaskStatus::Running,
1582            "reap_child must defer to poll_task when marker exists"
1583        );
1584    }
1585
1586    #[test]
1587    fn cleanup_finished_keeps_running_tasks() {
1588        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1589        let dir = tempfile::tempdir().unwrap();
1590        let task_id = registry
1591            .spawn(
1592                LONG_RUNNING_COMMAND,
1593                "session".to_string(),
1594                dir.path().to_path_buf(),
1595                HashMap::new(),
1596                Some(Duration::from_secs(30)),
1597                dir.path().to_path_buf(),
1598                10,
1599                true,
1600                false,
1601            )
1602            .unwrap();
1603
1604        registry.cleanup_finished(Duration::ZERO);
1605
1606        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1607        let _ = registry.kill(&task_id, "session");
1608    }
1609
1610    #[cfg(windows)]
1611    fn wait_for_file(path: &Path) -> String {
1612        let started = Instant::now();
1613        loop {
1614            if path.exists() {
1615                return fs::read_to_string(path).expect("read file");
1616            }
1617            assert!(
1618                started.elapsed() < Duration::from_secs(30),
1619                "timed out waiting for {}",
1620                path.display()
1621            );
1622            std::thread::sleep(Duration::from_millis(100));
1623        }
1624    }
1625
1626    #[cfg(windows)]
1627    fn spawn_windows_registry_command(
1628        command: &str,
1629    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1630        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1631        let dir = tempfile::tempdir().unwrap();
1632        let task_id = registry
1633            .spawn(
1634                command,
1635                "session".to_string(),
1636                dir.path().to_path_buf(),
1637                HashMap::new(),
1638                Some(Duration::from_secs(30)),
1639                dir.path().to_path_buf(),
1640                10,
1641                false,
1642                false,
1643            )
1644            .unwrap();
1645        (registry, dir, task_id)
1646    }
1647
1648    #[cfg(windows)]
1649    #[test]
1650    fn windows_spawn_writes_exit_marker_for_zero_exit() {
1651        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1652        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1653
1654        let content = wait_for_file(&exit_path);
1655
1656        assert_eq!(content.trim(), "0");
1657    }
1658
1659    #[cfg(windows)]
1660    #[test]
1661    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1662        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1663        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1664
1665        let content = wait_for_file(&exit_path);
1666
1667        assert_eq!(content.trim(), "42");
1668    }
1669
1670    #[cfg(windows)]
1671    #[test]
1672    fn windows_spawn_captures_stdout_to_disk() {
1673        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1674        let task = registry.task_for_session(&task_id, "session").unwrap();
1675        let stdout_path = task.paths.stdout.clone();
1676        let exit_path = task.paths.exit.clone();
1677
1678        let _ = wait_for_file(&exit_path);
1679        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1680
1681        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1682    }
1683
1684    #[cfg(windows)]
1685    #[test]
1686    fn windows_spawn_uses_pwsh_when_available() {
1687        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
1688        // (We intentionally pass None for shell_env to keep this test
1689        // independent of the runner's actual env.)
1690        let candidates = crate::windows_shell::shell_candidates_with(
1691            |binary| match binary {
1692                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
1693                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
1694                _ => None,
1695            },
1696            || None,
1697        );
1698        let shell = candidates.first().expect("at least one candidate").clone();
1699        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1700        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
1701    }
1702
1703    /// Issue #27 Oracle review P1: cmd wrapper MUST use `!ERRORLEVEL!` (not
1704    /// `%ERRORLEVEL%`) to capture the user command's run-time exit code.
1705    /// `%VAR%` is parse-time-expanded by cmd, so a wrapper using
1706    /// `%ERRORLEVEL%` would record a stale value (typically 0 from cmd's
1707    /// startup) regardless of what the user command returned. `!VAR!`
1708    /// requires delayed expansion, which `WindowsShell::bg_command` enables
1709    /// via `/V:ON`.
1710    #[cfg(windows)]
1711    #[test]
1712    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1713        let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1714        let script =
1715            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1716
1717        // MUST use !ERRORLEVEL! (delayed expansion), NOT %ERRORLEVEL%
1718        // (parse-time expansion). The latter would record a stale exit
1719        // code regardless of what the user command actually returned.
1720        assert!(
1721            script.contains("& echo !ERRORLEVEL! >"),
1722            "wrapper must use delayed expansion: {script}"
1723        );
1724        assert!(
1725            !script.contains("%ERRORLEVEL%"),
1726            "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1727        );
1728        assert!(script.contains("& move /Y"));
1729        // move output should be redirected to nul to avoid polluting the
1730        // user's captured stdout with "1 file(s) moved." lines.
1731        assert!(
1732            script.contains("> nul"),
1733            "wrapper must redirect move output to nul: {script}"
1734        );
1735        assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1736        assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1737    }
1738
1739    /// Issue #27 Oracle review P1: `bg_command()` for Cmd MUST prepend
1740    /// `/V:ON` to enable delayed expansion AND `/S` to use simple-quote
1741    /// parsing (so cmd's /C parser doesn't mangle the wrapper's internal
1742    /// quotes from `cmd_quote`).
1743    #[cfg(windows)]
1744    #[test]
1745    fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1746        use crate::windows_shell::WindowsShell;
1747        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1748        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1749        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1750        assert_eq!(
1751            args_strs,
1752            vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1753            "Cmd::bg_command must prepend /V:ON /D /S /C"
1754        );
1755    }
1756
1757    /// PowerShell variants don't need `/V:ON`-style flags; their args
1758    /// are the same for foreground (`command()`) and background
1759    /// (`bg_command()`).
1760    #[cfg(windows)]
1761    #[test]
1762    fn windows_shell_pwsh_bg_command_uses_standard_args() {
1763        use crate::windows_shell::WindowsShell;
1764        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1765        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1766        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1767        assert!(
1768            args_strs.contains(&"-Command"),
1769            "Pwsh::bg_command must use -Command: {args_strs:?}"
1770        );
1771        assert!(
1772            args_strs.contains(&"Get-Date"),
1773            "Pwsh::bg_command must include the user command body"
1774        );
1775    }
1776
1777    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
1778    /// **cmd.exe-specific** wrapper path captures the user command's
1779    /// run-time exit code correctly. The existing
1780    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
1781    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
1782    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
1783    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
1784    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
1785    ///
1786    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
1787    /// force the cmd-wrapper code path, then writes the exit marker and
1788    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
1789    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
1790    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
1791    /// regardless of what the user command returned.
1792    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
1793    /// the inline command-line wrapper helper that production code does
1794    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
1795    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
1796    /// produced silent failures on Windows 11 (move /Y could not parse
1797    /// path arguments through cmd's /C parser). The `bg_command` helper
1798    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
1799    /// the production spawn path goes through `detached_shell_command_for`
1800    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
1801    /// <bat-path>`.
1802    ///
1803    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
1804    /// covered live by the Windows e2e harness scenario 2d
1805    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
1806    /// exercises the real file-based wrapper end-to-end via the protocol.
1807    #[allow(dead_code)]
1808    #[cfg(any())] // disabled on all targets
1809    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1810}