Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use serde::Serialize;
12
13use crate::context::SharedProgressSender;
14use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
15
16#[cfg(unix)]
17use std::os::unix::process::CommandExt;
18#[cfg(windows)]
19use std::os::windows::process::CommandExt;
20
21use super::buffer::BgBuffer;
22use super::persistence::{
23    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
24    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
25    PersistedTask, TaskPaths,
26};
27use super::process::is_process_alive;
28#[cfg(unix)]
29use super::process::terminate_pgid;
30#[cfg(windows)]
31use super::process::terminate_pid;
32use super::{BgTaskInfo, BgTaskStatus};
33// Note: `resolve_windows_shell` is no longer imported at module scope —
34// production code in `spawn_detached_child` uses `shell_candidates()`
35// with retry instead, and the function remains in `windows_shell.rs`
36// for tests and as a future helper.
37
38/// Default timeout for background bash tasks: 30 minutes.
39/// Agents can override per-call via the `timeout` parameter (in ms).
40const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
41const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
42const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
43const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
44
45/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
46/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
47/// of typical command output (git status, test results, exit messages) — short
48/// enough that round-tripping multiple completions in one reminder stays well
49/// under the model's context budget but long enough that most successful runs
50/// don't need a follow-up `bash_status` call.
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
52
53#[derive(Debug, Clone, Serialize)]
54pub struct BgCompletion {
55    pub task_id: String,
56    /// Intentionally omitted from serialized completion payloads: push frames
57    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
58    #[serde(skip_serializing)]
59    pub session_id: String,
60    pub status: BgTaskStatus,
61    pub exit_code: Option<i32>,
62    pub command: String,
63    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
64    /// cached so push-frame consumers and `bash_drain_completions` callers see
65    /// the same preview without racing against later output rotation. Empty
66    /// when not captured (e.g., persisted task seen on startup before buffer
67    /// reattachment).
68    #[serde(default, skip_serializing_if = "String::is_empty")]
69    pub output_preview: String,
70    /// True when the captured tail is shorter than the actual output (because
71    /// rotation occurred or the output exceeds the preview cap). Plugins use
72    /// this to render a `…` prefix and signal that `bash_status` would return
73    /// more.
74    #[serde(default, skip_serializing_if = "is_false")]
75    pub output_truncated: bool,
76}
77
78fn is_false(v: &bool) -> bool {
79    !*v
80}
81
82#[derive(Debug, Clone, Serialize)]
83pub struct BgTaskSnapshot {
84    #[serde(flatten)]
85    pub info: BgTaskInfo,
86    pub exit_code: Option<i32>,
87    pub child_pid: Option<u32>,
88    pub workdir: String,
89    pub output_preview: String,
90    pub output_truncated: bool,
91    pub output_path: Option<String>,
92    pub stderr_path: Option<String>,
93}
94
95#[derive(Clone)]
96pub struct BgTaskRegistry {
97    pub(crate) inner: Arc<RegistryInner>,
98}
99
100pub(crate) struct RegistryInner {
101    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
102    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
103    pub(crate) progress_sender: SharedProgressSender,
104    watchdog_started: AtomicBool,
105    pub(crate) shutdown: AtomicBool,
106    pub(crate) long_running_reminder_enabled: AtomicBool,
107    pub(crate) long_running_reminder_interval_ms: AtomicU64,
108    persisted_gc_started: AtomicBool,
109    #[cfg(test)]
110    persisted_gc_runs: AtomicU64,
111    /// Output compression callback. Set by `AppContext` after construction.
112    /// Takes (command, raw_output) and returns compressed text. Called from
113    /// the watchdog thread when a task reaches a terminal state and from
114    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
115    /// uncompressed.
116    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
117}
118
119pub(crate) struct BgTask {
120    pub(crate) task_id: String,
121    pub(crate) session_id: String,
122    pub(crate) paths: TaskPaths,
123    pub(crate) started: Instant,
124    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
125    pub(crate) terminal_at: Mutex<Option<Instant>>,
126    pub(crate) state: Mutex<BgTaskState>,
127}
128
129pub(crate) struct BgTaskState {
130    pub(crate) metadata: PersistedTask,
131    pub(crate) child: Option<Child>,
132    pub(crate) detached: bool,
133    pub(crate) buffer: BgBuffer,
134}
135
136impl BgTaskRegistry {
137    pub fn new(progress_sender: SharedProgressSender) -> Self {
138        Self {
139            inner: Arc::new(RegistryInner {
140                tasks: Mutex::new(HashMap::new()),
141                completions: Mutex::new(VecDeque::new()),
142                progress_sender,
143                watchdog_started: AtomicBool::new(false),
144                shutdown: AtomicBool::new(false),
145                long_running_reminder_enabled: AtomicBool::new(true),
146                long_running_reminder_interval_ms: AtomicU64::new(600_000),
147                persisted_gc_started: AtomicBool::new(false),
148                #[cfg(test)]
149                persisted_gc_runs: AtomicU64::new(0),
150                compressor: Mutex::new(None),
151            }),
152        }
153    }
154
155    /// Install the output-compression callback. Called by `main.rs` after
156    /// `AppContext` is constructed so that snapshot/completion paths can
157    /// invoke `compress::compress_with_registry` without holding a context
158    /// reference. When called multiple times, the latest installation wins.
159    pub fn set_compressor<F>(&self, compressor: F)
160    where
161        F: Fn(&str, String) -> String + Send + Sync + 'static,
162    {
163        if let Ok(mut slot) = self.inner.compressor.lock() {
164            *slot = Some(Box::new(compressor));
165        }
166    }
167
168    /// Apply the installed compressor (if any) to `output`. Returns `output`
169    /// untouched when no compressor is installed.
170    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
171        let Ok(slot) = self.inner.compressor.lock() else {
172            return output;
173        };
174        match slot.as_ref() {
175            Some(compressor) => compressor(command, output),
176            None => output,
177        }
178    }
179
180    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
181        self.inner
182            .long_running_reminder_enabled
183            .store(enabled, Ordering::SeqCst);
184        self.inner
185            .long_running_reminder_interval_ms
186            .store(interval_ms, Ordering::SeqCst);
187    }
188
189    #[cfg(unix)]
190    #[allow(clippy::too_many_arguments)]
191    pub fn spawn(
192        &self,
193        command: &str,
194        session_id: String,
195        workdir: PathBuf,
196        env: HashMap<String, String>,
197        timeout: Option<Duration>,
198        storage_dir: PathBuf,
199        max_running: usize,
200        notify_on_completion: bool,
201        compressed: bool,
202        project_root: Option<PathBuf>,
203    ) -> Result<String, String> {
204        self.start_watchdog();
205
206        let running = self.running_count();
207        if running >= max_running {
208            return Err(format!(
209                "background bash task limit exceeded: {running} running (max {max_running})"
210            ));
211        }
212
213        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
214        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
215        let task_id = self.generate_unique_task_id()?;
216        let paths = task_paths(&storage_dir, &session_id, &task_id);
217        fs::create_dir_all(&paths.dir)
218            .map_err(|e| format!("failed to create background task dir: {e}"))?;
219
220        let mut metadata = PersistedTask::starting(
221            task_id.clone(),
222            session_id.clone(),
223            command.to_string(),
224            workdir.clone(),
225            project_root,
226            timeout_ms,
227            notify_on_completion,
228            compressed,
229        );
230        write_task(&paths.json, &metadata)
231            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
232
233        // Pre-create capture files so the watchdog/buffer can always
234        // open them for reading. The spawn helper opens its own handles
235        // per attempt because each `Command::spawn()` consumes them.
236        create_capture_file(&paths.stdout)
237            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
238        create_capture_file(&paths.stderr)
239            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
240
241        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
242            Ok(child) => child,
243            Err(error) => {
244                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
245                let _ = delete_task_bundle(&paths);
246                return Err(error);
247            }
248        };
249
250        let child_pid = child.id();
251        metadata.mark_running(child_pid, child_pid as i32);
252        write_task(&paths.json, &metadata)
253            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
254
255        let task = Arc::new(BgTask {
256            task_id: task_id.clone(),
257            session_id,
258            paths: paths.clone(),
259            started: Instant::now(),
260            last_reminder_at: Mutex::new(None),
261            terminal_at: Mutex::new(None),
262            state: Mutex::new(BgTaskState {
263                metadata,
264                child: Some(child),
265                detached: false,
266                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
267            }),
268        });
269
270        self.inner
271            .tasks
272            .lock()
273            .map_err(|_| "background task registry lock poisoned".to_string())?
274            .insert(task_id.clone(), task);
275
276        Ok(task_id)
277    }
278
279    #[cfg(windows)]
280    #[allow(clippy::too_many_arguments)]
281    pub fn spawn(
282        &self,
283        command: &str,
284        session_id: String,
285        workdir: PathBuf,
286        env: HashMap<String, String>,
287        timeout: Option<Duration>,
288        storage_dir: PathBuf,
289        max_running: usize,
290        notify_on_completion: bool,
291        compressed: bool,
292        project_root: Option<PathBuf>,
293    ) -> Result<String, String> {
294        self.start_watchdog();
295
296        let running = self.running_count();
297        if running >= max_running {
298            return Err(format!(
299                "background bash task limit exceeded: {running} running (max {max_running})"
300            ));
301        }
302
303        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
304        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
305        let task_id = self.generate_unique_task_id()?;
306        let paths = task_paths(&storage_dir, &session_id, &task_id);
307        fs::create_dir_all(&paths.dir)
308            .map_err(|e| format!("failed to create background task dir: {e}"))?;
309
310        let mut metadata = PersistedTask::starting(
311            task_id.clone(),
312            session_id.clone(),
313            command.to_string(),
314            workdir.clone(),
315            project_root,
316            timeout_ms,
317            notify_on_completion,
318            compressed,
319        );
320        write_task(&paths.json, &metadata)
321            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
322
323        // Capture files are pre-created so the watchdog/buffer can always
324        // open them for reading even if the child hasn't written anything
325        // yet. The spawn helper opens its own handles per attempt because
326        // each `Command::spawn()` consumes them, and on Windows we may
327        // retry across multiple shell candidates if the first one fails.
328        create_capture_file(&paths.stdout)
329            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
330        create_capture_file(&paths.stderr)
331            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
332
333        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
334            Ok(child) => child,
335            Err(error) => {
336                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
337                let _ = delete_task_bundle(&paths);
338                return Err(error);
339            }
340        };
341
342        let child_pid = child.id();
343        metadata.status = BgTaskStatus::Running;
344        metadata.child_pid = Some(child_pid);
345        metadata.pgid = None;
346        write_task(&paths.json, &metadata)
347            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
348
349        let task = Arc::new(BgTask {
350            task_id: task_id.clone(),
351            session_id,
352            paths: paths.clone(),
353            started: Instant::now(),
354            last_reminder_at: Mutex::new(None),
355            terminal_at: Mutex::new(None),
356            state: Mutex::new(BgTaskState {
357                metadata,
358                child: Some(child),
359                detached: false,
360                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
361            }),
362        });
363
364        self.inner
365            .tasks
366            .lock()
367            .map_err(|_| "background task registry lock poisoned".to_string())?
368            .insert(task_id.clone(), task);
369
370        Ok(task_id)
371    }
372
373    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
374        self.replay_session_inner(storage_dir, session_id, None)
375    }
376
377    pub fn replay_session_for_project(
378        &self,
379        storage_dir: &Path,
380        session_id: &str,
381        project_root: &Path,
382    ) -> Result<(), String> {
383        self.replay_session_inner(storage_dir, session_id, Some(project_root))
384    }
385
386    fn replay_session_inner(
387        &self,
388        storage_dir: &Path,
389        session_id: &str,
390        project_root: Option<&Path>,
391    ) -> Result<(), String> {
392        self.start_watchdog();
393        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
394            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
395                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
396            }
397        }
398        let dir = session_tasks_dir(storage_dir, session_id);
399        if !dir.exists() {
400            return Ok(());
401        }
402
403        let canonical_project = project_root.map(canonicalized_path);
404        let entries = fs::read_dir(&dir)
405            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
406        for entry in entries.flatten() {
407            let path = entry.path();
408            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
409                continue;
410            }
411            let Ok(mut metadata) = read_task(&path) else {
412                continue;
413            };
414            if metadata.session_id != session_id {
415                continue;
416            }
417            if let Some(canonical_project) = canonical_project.as_deref() {
418                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
419                if metadata_project.as_deref() != Some(canonical_project) {
420                    continue;
421                }
422            }
423
424            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
425            match metadata.status {
426                BgTaskStatus::Starting => {
427                    metadata.mark_terminal(
428                        BgTaskStatus::Failed,
429                        None,
430                        Some("spawn aborted".to_string()),
431                    );
432                    let _ = write_task(&paths.json, &metadata);
433                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
434                }
435                BgTaskStatus::Running | BgTaskStatus::Killing => {
436                    if self.running_metadata_is_stale(&metadata) {
437                        metadata.mark_terminal(
438                            BgTaskStatus::Killed,
439                            None,
440                            Some("orphaned (>24h)".to_string()),
441                        );
442                        if !paths.exit.exists() {
443                            let _ = write_kill_marker_if_absent(&paths.exit);
444                        }
445                        let _ = write_task(&paths.json, &metadata);
446                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
447                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
448                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
449                            "recovered from inconsistent killing state on replay".to_string()
450                        });
451                        if reason.is_some() {
452                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
453                            metadata.task_id);
454                        }
455                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
456                        let _ = write_task(&paths.json, &metadata);
457                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
458                    } else if metadata.status == BgTaskStatus::Killing {
459                        if !paths.exit.exists() {
460                            let _ = write_kill_marker_if_absent(&paths.exit);
461                        }
462                        metadata.mark_terminal(
463                            BgTaskStatus::Killed,
464                            None,
465                            Some("recovered from inconsistent killing state on replay".to_string()),
466                        );
467                        let _ = write_task(&paths.json, &metadata);
468                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
469                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
470                        metadata.mark_terminal(
471                            BgTaskStatus::Failed,
472                            None,
473                            Some("process exited without exit marker".to_string()),
474                        );
475                        let _ = write_task(&paths.json, &metadata);
476                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
477                    } else {
478                        self.insert_rehydrated_task(metadata, paths, true)?;
479                    }
480                }
481                _ if metadata.status.is_terminal() => {
482                    // Borrow `paths` for the completion enqueue BEFORE
483                    // `insert_rehydrated_task` consumes it. The completion
484                    // helper only reads from `paths` (stdout/stderr/exit) to
485                    // reconstruct a tail preview, so it must see the same
486                    // paths the rehydrated task will own.
487                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
488                    self.insert_rehydrated_task(metadata, paths, true)?;
489                }
490                _ => {}
491            }
492        }
493
494        Ok(())
495    }
496
497    pub fn status(
498        &self,
499        task_id: &str,
500        session_id: &str,
501        project_root: Option<&Path>,
502        storage_dir: Option<&Path>,
503        preview_bytes: usize,
504    ) -> Option<BgTaskSnapshot> {
505        let mut task = self.task_for_session(task_id, session_id);
506        if task.is_none() {
507            if let Some(storage_dir) = storage_dir {
508                let _ = self.replay_session(storage_dir, session_id);
509                task = self.task_for_session(task_id, session_id);
510            }
511        }
512        let Some(task) = task else {
513            return self.status_relaxed(
514                task_id,
515                session_id,
516                project_root?,
517                storage_dir?,
518                preview_bytes,
519            );
520        };
521        let _ = self.poll_task(&task);
522        let mut snapshot = task.snapshot(preview_bytes);
523        self.maybe_compress_snapshot(&task, &mut snapshot);
524        Some(snapshot)
525    }
526
527    fn status_relaxed_task(
528        &self,
529        task_id: &str,
530        project_root: &Path,
531        storage_dir: &Path,
532    ) -> Option<Arc<BgTask>> {
533        let canonical_project = canonicalized_path(project_root);
534        let root = storage_dir.join("bash-tasks");
535        let entries = fs::read_dir(&root).ok()?;
536        for entry in entries.flatten() {
537            let dir = entry.path();
538            if !dir.is_dir() {
539                continue;
540            }
541            let path = dir.join(format!("{task_id}.json"));
542            if !path.exists() {
543                continue;
544            }
545            let Ok(metadata) = read_task(&path) else {
546                continue;
547            };
548            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
549            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
550                continue;
551            }
552            if let Some(task) = self.task(task_id) {
553                let matches_project = task
554                    .state
555                    .lock()
556                    .map(|state| {
557                        state
558                            .metadata
559                            .project_root
560                            .as_deref()
561                            .map(canonicalized_path)
562                            .as_deref()
563                            == Some(canonical_project.as_path())
564                    })
565                    .unwrap_or(false);
566                return matches_project.then_some(task);
567            }
568            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
569            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
570                return None;
571            }
572            return self.task(task_id);
573        }
574        None
575    }
576
577    pub(super) fn status_relaxed(
578        &self,
579        task_id: &str,
580        _session_id: &str,
581        project_root: &Path,
582        storage_dir: &Path,
583        preview_bytes: usize,
584    ) -> Option<BgTaskSnapshot> {
585        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
586        let _ = self.poll_task(&task);
587        let mut snapshot = task.snapshot(preview_bytes);
588        self.maybe_compress_snapshot(&task, &mut snapshot);
589        Some(snapshot)
590    }
591
592    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
593        #[cfg(test)]
594        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
595
596        let mut deleted = 0usize;
597
598        let root = storage_dir.join("bash-tasks");
599        if root.exists() {
600            let session_dirs = fs::read_dir(&root).map_err(|e| {
601                format!(
602                    "failed to read background task root {}: {e}",
603                    root.display()
604                )
605            })?;
606            for session_entry in session_dirs.flatten() {
607                let session_dir = session_entry.path();
608                if !session_dir.is_dir() {
609                    continue;
610                }
611                let task_entries = match fs::read_dir(&session_dir) {
612                    Ok(entries) => entries,
613                    Err(error) => {
614                        crate::slog_warn!(
615                            "failed to read background task session dir {}: {error}",
616                            session_dir.display()
617                        );
618                        continue;
619                    }
620                };
621                for task_entry in task_entries.flatten() {
622                    let json_path = task_entry.path();
623                    if json_path
624                        .extension()
625                        .and_then(|extension| extension.to_str())
626                        != Some("json")
627                    {
628                        continue;
629                    }
630                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
631                        continue;
632                    }
633                    let metadata = match read_task(&json_path) {
634                        Ok(metadata) => metadata,
635                        Err(error) => {
636                            crate::slog_warn!(
637                                "quarantining corrupt background task metadata {}: {error}",
638                                json_path.display()
639                            );
640                            quarantine_corrupt_task_json(storage_dir, &session_dir, &json_path)?;
641                            continue;
642                        }
643                    };
644                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
645                        continue;
646                    }
647                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
648                    match delete_task_bundle(&paths) {
649                        Ok(()) => {
650                            deleted += 1;
651                            log::debug!(
652                                "deleted persisted background task bundle {}",
653                                metadata.task_id
654                            );
655                        }
656                        Err(error) => {
657                            crate::slog_warn!(
658                                "failed to delete background task bundle {}: {error}",
659                                metadata.task_id
660                            );
661                            continue;
662                        }
663                    }
664                }
665            }
666        }
667        gc_quarantine(storage_dir);
668        Ok(deleted)
669    }
670
671    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
672        let tasks = self
673            .inner
674            .tasks
675            .lock()
676            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
677            .unwrap_or_default();
678        tasks
679            .into_iter()
680            .map(|task| {
681                let _ = self.poll_task(&task);
682                let mut snapshot = task.snapshot(preview_bytes);
683                self.maybe_compress_snapshot(&task, &mut snapshot);
684                snapshot
685            })
686            .collect()
687    }
688
689    /// Compress `output_preview` in place when the task is in a terminal
690    /// state. Live tail of running tasks stays raw so agents debugging
691    /// long-running bash see exactly what the process emitted, not a
692    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
693    /// field on `PersistedTask` short-circuits before the compress pipeline.
694    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
695        if !snapshot.info.status.is_terminal() {
696            return;
697        }
698        let compressed_flag = task
699            .state
700            .lock()
701            .map(|state| state.metadata.compressed)
702            .unwrap_or(true);
703        if !compressed_flag {
704            return;
705        }
706        let raw = std::mem::take(&mut snapshot.output_preview);
707        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
708    }
709
710    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
711        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
712    }
713
714    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
715        let task = self
716            .task_for_session(task_id, session_id)
717            .ok_or_else(|| format!("background task not found: {task_id}"))?;
718        let mut state = task
719            .state
720            .lock()
721            .map_err(|_| "background task lock poisoned".to_string())?;
722        let updated = update_task(&task.paths.json, |metadata| {
723            metadata.notify_on_completion = true;
724            metadata.completion_delivered = false;
725        })
726        .map_err(|e| format!("failed to promote background task: {e}"))?;
727        state.metadata = updated;
728        if state.metadata.status.is_terminal() {
729            state.buffer.enforce_terminal_cap();
730            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
731        }
732        Ok(true)
733    }
734
735    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
736        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
737            .map(|_| ())
738    }
739
740    pub fn cleanup_finished(&self, older_than: Duration) {
741        let cutoff = Instant::now().checked_sub(older_than);
742        let removable_paths: Vec<(String, TaskPaths)> =
743            if let Ok(mut tasks) = self.inner.tasks.lock() {
744                let removable = tasks
745                    .iter()
746                    .filter_map(|(task_id, task)| {
747                        let delivered_terminal = task
748                            .state
749                            .lock()
750                            .map(|state| {
751                                state.metadata.status.is_terminal()
752                                    && state.metadata.completion_delivered
753                            })
754                            .unwrap_or(false);
755                        if !delivered_terminal {
756                            return None;
757                        }
758
759                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
760                        let expired = match (terminal_at, cutoff) {
761                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
762                            (Some(_), None) => true,
763                            (None, _) => false,
764                        };
765                        expired.then(|| task_id.clone())
766                    })
767                    .collect::<Vec<_>>();
768
769                removable
770                    .into_iter()
771                    .filter_map(|task_id| {
772                        tasks
773                            .remove(&task_id)
774                            .map(|task| (task_id, task.paths.clone()))
775                    })
776                    .collect()
777            } else {
778                Vec::new()
779            };
780
781        for (task_id, paths) in removable_paths {
782            match delete_task_bundle(&paths) {
783                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
784                Err(error) => crate::slog_warn!(
785                    "failed to delete persisted background task bundle {task_id}: {error}"
786                ),
787            }
788        }
789    }
790
791    pub fn drain_completions(&self) -> Vec<BgCompletion> {
792        self.drain_completions_for_session(None)
793    }
794
795    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
796        let mut completions = match self.inner.completions.lock() {
797            Ok(completions) => completions,
798            Err(_) => return Vec::new(),
799        };
800
801        let drained = if let Some(session_id) = session_id {
802            let mut matched = Vec::new();
803            let mut retained = VecDeque::new();
804            while let Some(completion) = completions.pop_front() {
805                if completion.session_id == session_id {
806                    matched.push(completion);
807                } else {
808                    retained.push_back(completion);
809                }
810            }
811            *completions = retained;
812            matched
813        } else {
814            completions.drain(..).collect()
815        };
816        drop(completions);
817
818        for completion in &drained {
819            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
820                let _ = task.set_completion_delivered(true);
821            }
822        }
823
824        drained
825    }
826
827    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
828        self.inner
829            .completions
830            .lock()
831            .map(|completions| {
832                completions
833                    .iter()
834                    .filter(|completion| completion.session_id == session_id)
835                    .cloned()
836                    .collect()
837            })
838            .unwrap_or_default()
839    }
840
841    pub fn detach(&self) {
842        self.inner.shutdown.store(true, Ordering::SeqCst);
843        if let Ok(mut tasks) = self.inner.tasks.lock() {
844            for task in tasks.values() {
845                if let Ok(mut state) = task.state.lock() {
846                    state.child = None;
847                    state.detached = true;
848                }
849            }
850            tasks.clear();
851        }
852    }
853
854    pub fn shutdown(&self) {
855        let tasks = self
856            .inner
857            .tasks
858            .lock()
859            .map(|tasks| {
860                tasks
861                    .values()
862                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
863                    .collect::<Vec<_>>()
864            })
865            .unwrap_or_default();
866        for (task_id, session_id) in tasks {
867            let _ = self.kill(&task_id, &session_id);
868        }
869    }
870
871    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
872        let marker = match read_exit_marker(&task.paths.exit) {
873            Ok(Some(marker)) => marker,
874            Ok(None) => return Ok(()),
875            Err(error) => return Err(format!("failed to read exit marker: {error}")),
876        };
877        self.finalize_from_marker(task, marker, None)
878    }
879
880    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
881        let Ok(mut state) = task.state.lock() else {
882            return;
883        };
884        if let Some(child) = state.child.as_mut() {
885            if matches!(child.try_wait(), Ok(Some(_))) {
886                // Child has exited. If the wrapper successfully wrote an
887                // exit marker, the next `poll_task()` cycle will pick it up
888                // and finalize via `finalize_from_marker`. But if the
889                // wrapper crashed before writing the marker (e.g. SIGKILL,
890                // power loss, wrapper bug), the task would forever appear
891                // Running until `timeout_ms` expired — and if no timeout
892                // was set, until the 24h `running_metadata_is_stale` cutoff
893                // hit at the next aft restart. Same condition as the replay
894                // path's "PID dead but no marker" branch (see line 338).
895                //
896                // To avoid that hidden hang, mark the task Failed
897                // immediately with the same reason string used by replay,
898                // but only if the marker is genuinely absent. If a marker
899                // appeared on disk between try_wait() returning and now
900                // (race window), prefer the marker — let the next poll
901                // cycle finalize from it.
902                state.child = None;
903                state.detached = true;
904                self.fail_without_exit_marker_if_needed(task, &mut state);
905            }
906        } else if state.detached
907            && state
908                .metadata
909                .child_pid
910                .is_some_and(|pid| !is_process_alive(pid))
911        {
912            self.fail_without_exit_marker_if_needed(task, &mut state);
913        }
914    }
915
916    fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
917        if state.metadata.status.is_terminal() {
918            return;
919        }
920        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
921            return;
922        }
923        let updated = update_task(&task.paths.json, |metadata| {
924            metadata.mark_terminal(
925                BgTaskStatus::Failed,
926                None,
927                Some("process exited without exit marker".to_string()),
928            );
929        });
930        if let Ok(metadata) = updated {
931            state.metadata = metadata;
932            task.mark_terminal_now();
933            state.buffer.enforce_terminal_cap();
934            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
935        }
936    }
937
938    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
939        self.inner
940            .tasks
941            .lock()
942            .map(|tasks| {
943                tasks
944                    .values()
945                    .filter(|task| task.is_running())
946                    .cloned()
947                    .collect()
948            })
949            .unwrap_or_default()
950    }
951
952    fn insert_rehydrated_task(
953        &self,
954        metadata: PersistedTask,
955        paths: TaskPaths,
956        detached: bool,
957    ) -> Result<(), String> {
958        let task_id = metadata.task_id.clone();
959        let session_id = metadata.session_id.clone();
960        let started = started_instant_from_unix_millis(metadata.started_at);
961        let task = Arc::new(BgTask {
962            task_id: task_id.clone(),
963            session_id,
964            paths: paths.clone(),
965            started,
966            last_reminder_at: Mutex::new(None),
967            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
968            state: Mutex::new(BgTaskState {
969                metadata,
970                child: None,
971                detached,
972                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
973            }),
974        });
975        self.inner
976            .tasks
977            .lock()
978            .map_err(|_| "background task registry lock poisoned".to_string())?
979            .insert(task_id, task);
980        Ok(())
981    }
982
983    fn kill_with_status(
984        &self,
985        task_id: &str,
986        session_id: &str,
987        terminal_status: BgTaskStatus,
988    ) -> Result<BgTaskSnapshot, String> {
989        let task = self
990            .task_for_session(task_id, session_id)
991            .ok_or_else(|| format!("background task not found: {task_id}"))?;
992
993        {
994            let mut state = task
995                .state
996                .lock()
997                .map_err(|_| "background task lock poisoned".to_string())?;
998            if state.metadata.status.is_terminal() {
999                return Ok(task.snapshot_locked(&state, 5 * 1024));
1000            }
1001
1002            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1003                state.metadata =
1004                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1005                task.mark_terminal_now();
1006                state.child = None;
1007                state.detached = true;
1008                state.buffer.enforce_terminal_cap();
1009                write_task(&task.paths.json, &state.metadata)
1010                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1011                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1012                return Ok(task.snapshot_locked(&state, 5 * 1024));
1013            }
1014
1015            state.metadata.status = BgTaskStatus::Killing;
1016            write_task(&task.paths.json, &state.metadata)
1017                .map_err(|e| format!("failed to persist killing state: {e}"))?;
1018
1019            #[cfg(unix)]
1020            if let Some(pgid) = state.metadata.pgid {
1021                terminate_pgid(pgid, state.child.as_mut());
1022            }
1023            #[cfg(windows)]
1024            if let Some(child) = state.child.as_mut() {
1025                super::process::terminate_process(child);
1026            } else if let Some(pid) = state.metadata.child_pid {
1027                terminate_pid(pid);
1028            }
1029            if let Some(child) = state.child.as_mut() {
1030                let _ = child.wait();
1031            }
1032            state.child = None;
1033            state.detached = true;
1034
1035            if !task.paths.exit.exists() {
1036                write_kill_marker_if_absent(&task.paths.exit)
1037                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
1038            }
1039
1040            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1041                Some(124)
1042            } else {
1043                None
1044            };
1045            state
1046                .metadata
1047                .mark_terminal(terminal_status, exit_code, None);
1048            task.mark_terminal_now();
1049            write_task(&task.paths.json, &state.metadata)
1050                .map_err(|e| format!("failed to persist killed state: {e}"))?;
1051            state.buffer.enforce_terminal_cap();
1052            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1053        }
1054
1055        Ok(task.snapshot(5 * 1024))
1056    }
1057
1058    fn finalize_from_marker(
1059        &self,
1060        task: &Arc<BgTask>,
1061        marker: ExitMarker,
1062        reason: Option<String>,
1063    ) -> Result<(), String> {
1064        let mut state = task
1065            .state
1066            .lock()
1067            .map_err(|_| "background task lock poisoned".to_string())?;
1068        if state.metadata.status.is_terminal() {
1069            return Ok(());
1070        }
1071
1072        let updated = update_task(&task.paths.json, |metadata| {
1073            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1074            *metadata = new_metadata;
1075        })
1076        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1077        state.metadata = updated;
1078        task.mark_terminal_now();
1079        state.child = None;
1080        state.detached = true;
1081        state.buffer.enforce_terminal_cap();
1082        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1083        Ok(())
1084    }
1085
1086    fn enqueue_completion_if_needed(
1087        &self,
1088        metadata: &PersistedTask,
1089        paths: Option<&TaskPaths>,
1090        emit_frame: bool,
1091    ) {
1092        if metadata.status.is_terminal() && !metadata.completion_delivered {
1093            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1094        }
1095    }
1096
1097    fn enqueue_completion_locked(
1098        &self,
1099        metadata: &PersistedTask,
1100        buffer: Option<&BgBuffer>,
1101        emit_frame: bool,
1102    ) {
1103        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1104    }
1105
1106    fn enqueue_completion_from_parts(
1107        &self,
1108        metadata: &PersistedTask,
1109        buffer: Option<&BgBuffer>,
1110        paths: Option<&TaskPaths>,
1111        emit_frame: bool,
1112    ) {
1113        if !metadata.status.is_terminal() || metadata.completion_delivered {
1114            return;
1115        }
1116        // Read tail once at completion time and cache on the BgCompletion so
1117        // both the push-frame consumer (running session) and any later
1118        // `bash_drain_completions` poll (different session, restart) see the
1119        // same preview without racing against rotation.
1120        let (raw_preview, output_truncated) = match buffer {
1121            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1122            None => paths
1123                .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1124                .unwrap_or_else(|| (String::new(), false)),
1125        };
1126        // Compress at completion time so push-frame consumers and later
1127        // `bash_drain_completions` poll-callers see the same compressed text.
1128        // Per-task `compressed: false` opts out; otherwise the compressor is
1129        // a no-op when `experimental.bash.compress=false`.
1130        let output_preview = if metadata.compressed {
1131            self.compress_output(&metadata.command, raw_preview)
1132        } else {
1133            raw_preview
1134        };
1135        let completion = BgCompletion {
1136            task_id: metadata.task_id.clone(),
1137            session_id: metadata.session_id.clone(),
1138            status: metadata.status.clone(),
1139            exit_code: metadata.exit_code,
1140            command: metadata.command.clone(),
1141            output_preview,
1142            output_truncated,
1143        };
1144        if let Ok(mut completions) = self.inner.completions.lock() {
1145            if completions
1146                .iter()
1147                .any(|completion| completion.task_id == metadata.task_id)
1148            {
1149                return;
1150            }
1151            completions.push_back(completion.clone());
1152        } else {
1153            return;
1154        }
1155
1156        if emit_frame {
1157            self.emit_bash_completed(completion);
1158        }
1159    }
1160
1161    fn emit_bash_completed(&self, completion: BgCompletion) {
1162        let Ok(progress_sender) = self
1163            .inner
1164            .progress_sender
1165            .lock()
1166            .map(|sender| sender.clone())
1167        else {
1168            return;
1169        };
1170        let Some(sender) = progress_sender.as_ref() else {
1171            return;
1172        };
1173        // Clone the callback out of the registry mutex before writing to stdout;
1174        // otherwise a blocked push-frame write could pin the mutex and starve
1175        // unrelated progress-sender updates.
1176        // Bg task transitions are discovered by the watchdog thread, so the
1177        // sender is shared behind a Mutex. It still uses the same stdout writer
1178        // closure as foreground progress frames, preserving the existing lock/
1179        // flush behavior in main.rs.
1180        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1181            completion.task_id,
1182            completion.session_id,
1183            completion.status,
1184            completion.exit_code,
1185            completion.command,
1186            completion.output_preview,
1187            completion.output_truncated,
1188        )));
1189    }
1190
1191    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1192        if !self
1193            .inner
1194            .long_running_reminder_enabled
1195            .load(Ordering::SeqCst)
1196        {
1197            return;
1198        }
1199        let interval_ms = self
1200            .inner
1201            .long_running_reminder_interval_ms
1202            .load(Ordering::SeqCst);
1203        if interval_ms == 0 {
1204            return;
1205        }
1206        let interval = Duration::from_millis(interval_ms);
1207        let now = Instant::now();
1208        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1209            return;
1210        };
1211        let since = last_reminder_at.unwrap_or(task.started);
1212        if now.duration_since(since) < interval {
1213            return;
1214        }
1215        let command = task
1216            .state
1217            .lock()
1218            .map(|state| state.metadata.command.clone())
1219            .unwrap_or_default();
1220        *last_reminder_at = Some(now);
1221        self.emit_bash_long_running(BashLongRunningFrame::new(
1222            task.task_id.clone(),
1223            task.session_id.clone(),
1224            command,
1225            task.started.elapsed().as_millis() as u64,
1226        ));
1227    }
1228
1229    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1230        let Ok(progress_sender) = self
1231            .inner
1232            .progress_sender
1233            .lock()
1234            .map(|sender| sender.clone())
1235        else {
1236            return;
1237        };
1238        if let Some(sender) = progress_sender.as_ref() {
1239            sender(PushFrame::BashLongRunning(frame));
1240        }
1241    }
1242
1243    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1244        self.inner
1245            .tasks
1246            .lock()
1247            .ok()
1248            .and_then(|tasks| tasks.get(task_id).cloned())
1249    }
1250
1251    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1252        self.task(task_id)
1253            .filter(|task| task.session_id == session_id)
1254    }
1255
1256    fn running_count(&self) -> usize {
1257        self.inner
1258            .tasks
1259            .lock()
1260            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1261            .unwrap_or(0)
1262    }
1263
1264    fn start_watchdog(&self) {
1265        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1266            super::watchdog::start(self.clone());
1267        }
1268    }
1269
1270    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1271        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1272    }
1273
1274    #[cfg(test)]
1275    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1276        self.task_for_session(task_id, session_id)
1277            .map(|task| task.paths.json.clone())
1278    }
1279
1280    #[cfg(test)]
1281    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1282        self.task_for_session(task_id, session_id)
1283            .map(|task| task.paths.exit.clone())
1284    }
1285
1286    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
1287    fn generate_unique_task_id(&self) -> Result<String, String> {
1288        for _ in 0..32 {
1289            let candidate = random_slug();
1290            let tasks = self
1291                .inner
1292                .tasks
1293                .lock()
1294                .map_err(|_| "background task registry lock poisoned".to_string())?;
1295            if tasks.contains_key(&candidate) {
1296                continue;
1297            }
1298            let completions = self
1299                .inner
1300                .completions
1301                .lock()
1302                .map_err(|_| "background completions lock poisoned".to_string())?;
1303            if completions
1304                .iter()
1305                .any(|completion| completion.task_id == candidate)
1306            {
1307                continue;
1308            }
1309            return Ok(candidate);
1310        }
1311        Err("failed to allocate unique background task id after 32 attempts".to_string())
1312    }
1313}
1314
1315impl Default for BgTaskRegistry {
1316    fn default() -> Self {
1317        Self::new(Arc::new(Mutex::new(None)))
1318    }
1319}
1320
1321fn modified_within(path: &Path, grace: Duration) -> bool {
1322    fs::metadata(path)
1323        .and_then(|metadata| metadata.modified())
1324        .ok()
1325        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1326        .map(|age| age < grace)
1327        .unwrap_or(false)
1328}
1329
1330fn canonicalized_path(path: &Path) -> PathBuf {
1331    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1332}
1333
1334fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1335    let now_ms = SystemTime::now()
1336        .duration_since(UNIX_EPOCH)
1337        .ok()
1338        .map(|duration| duration.as_millis() as u64)
1339        .unwrap_or(started_at);
1340    let elapsed_ms = now_ms.saturating_sub(started_at);
1341    Instant::now()
1342        .checked_sub(Duration::from_millis(elapsed_ms))
1343        .unwrap_or_else(Instant::now)
1344}
1345
1346fn gc_quarantine(storage_dir: &Path) {
1347    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1348    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1349        return;
1350    };
1351    for session_entry in session_dirs.flatten() {
1352        let session_quarantine_dir = session_entry.path();
1353        if !session_quarantine_dir.is_dir() {
1354            continue;
1355        }
1356        let entries = match fs::read_dir(&session_quarantine_dir) {
1357            Ok(entries) => entries,
1358            Err(error) => {
1359                crate::slog_warn!(
1360                    "failed to read background task quarantine dir {}: {error}",
1361                    session_quarantine_dir.display()
1362                );
1363                continue;
1364            }
1365        };
1366        for entry in entries.flatten() {
1367            let path = entry.path();
1368            if modified_within(&path, QUARANTINE_GC_GRACE) {
1369                continue;
1370            }
1371            let result = if path.is_dir() {
1372                fs::remove_dir_all(&path)
1373            } else {
1374                fs::remove_file(&path)
1375            };
1376            match result {
1377                Ok(()) => log::debug!(
1378                    "deleted old background task quarantine entry {}",
1379                    path.display()
1380                ),
1381                Err(error) => crate::slog_warn!(
1382                    "failed to delete old background task quarantine entry {}: {error}",
1383                    path.display()
1384                ),
1385            }
1386        }
1387        let _ = fs::remove_dir(&session_quarantine_dir);
1388    }
1389    let _ = fs::remove_dir(&quarantine_root);
1390}
1391
1392fn quarantine_corrupt_task_json(
1393    storage_dir: &Path,
1394    session_dir: &Path,
1395    json_path: &Path,
1396) -> Result<(), String> {
1397    let session_hash = session_dir
1398        .file_name()
1399        .and_then(|name| name.to_str())
1400        .ok_or_else(|| {
1401            format!(
1402                "invalid background task session dir: {}",
1403                session_dir.display()
1404            )
1405        })?;
1406    let task_name = json_path
1407        .file_name()
1408        .and_then(|name| name.to_str())
1409        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1410    let unix_ts = SystemTime::now()
1411        .duration_since(UNIX_EPOCH)
1412        .map(|duration| duration.as_secs())
1413        .unwrap_or(0);
1414    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1415    fs::create_dir_all(&quarantine_dir).map_err(|e| {
1416        format!(
1417            "failed to create background task quarantine dir {}: {e}",
1418            quarantine_dir.display()
1419        )
1420    })?;
1421    let target = quarantine_dir.join(format!("{task_name}.corrupt-{unix_ts}"));
1422    fs::rename(json_path, &target).map_err(|e| {
1423        format!(
1424            "failed to quarantine corrupt background task metadata {} to {}: {e}",
1425            json_path.display(),
1426            target.display()
1427        )
1428    })?;
1429
1430    for sibling in task_sibling_paths(json_path) {
1431        if !sibling.exists() {
1432            continue;
1433        }
1434        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1435            crate::slog_warn!(
1436                "skipping background task sibling with invalid name during quarantine: {}",
1437                sibling.display()
1438            );
1439            continue;
1440        };
1441        let sibling_target = quarantine_dir.join(format!("{sibling_name}.corrupt-{unix_ts}"));
1442        if let Err(error) = fs::rename(&sibling, &sibling_target) {
1443            crate::slog_warn!(
1444                "failed to quarantine background task sibling {} to {}: {error}",
1445                sibling.display(),
1446                sibling_target.display()
1447            );
1448        }
1449    }
1450
1451    let _ = fs::remove_dir(session_dir);
1452    Ok(())
1453}
1454
1455fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
1456    let Some(parent) = json_path.parent() else {
1457        return Vec::new();
1458    };
1459    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
1460        return Vec::new();
1461    };
1462    ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
1463        .into_iter()
1464        .map(|extension| parent.join(format!("{stem}.{extension}")))
1465        .collect()
1466}
1467
1468fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
1469    let stdout = fs::read(&paths.stdout).unwrap_or_default();
1470    let stderr = fs::read(&paths.stderr).unwrap_or_default();
1471    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
1472    bytes.extend_from_slice(&stdout);
1473    bytes.extend_from_slice(&stderr);
1474    if bytes.len() <= max_bytes {
1475        return (String::from_utf8_lossy(&bytes).into_owned(), false);
1476    }
1477    let start = bytes.len().saturating_sub(max_bytes);
1478    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
1479}
1480
1481impl BgTask {
1482    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1483        let state = self
1484            .state
1485            .lock()
1486            .unwrap_or_else(|poison| poison.into_inner());
1487        self.snapshot_locked(&state, preview_bytes)
1488    }
1489
1490    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1491        let metadata = &state.metadata;
1492        let duration_ms = metadata.duration_ms.or_else(|| {
1493            metadata
1494                .status
1495                .is_terminal()
1496                .then(|| self.started.elapsed().as_millis() as u64)
1497        });
1498        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1499        BgTaskSnapshot {
1500            info: BgTaskInfo {
1501                task_id: self.task_id.clone(),
1502                status: metadata.status.clone(),
1503                command: metadata.command.clone(),
1504                started_at: metadata.started_at,
1505                duration_ms,
1506            },
1507            exit_code: metadata.exit_code,
1508            child_pid: metadata.child_pid,
1509            workdir: metadata.workdir.display().to_string(),
1510            output_preview,
1511            output_truncated,
1512            output_path: state
1513                .buffer
1514                .output_path()
1515                .map(|path| path.display().to_string()),
1516            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1517        }
1518    }
1519
1520    pub(crate) fn is_running(&self) -> bool {
1521        self.state
1522            .lock()
1523            .map(|state| state.metadata.status == BgTaskStatus::Running)
1524            .unwrap_or(false)
1525    }
1526
1527    fn mark_terminal_now(&self) {
1528        if let Ok(mut terminal_at) = self.terminal_at.lock() {
1529            if terminal_at.is_none() {
1530                *terminal_at = Some(Instant::now());
1531            }
1532        }
1533    }
1534
1535    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1536        let mut state = self
1537            .state
1538            .lock()
1539            .map_err(|_| "background task lock poisoned".to_string())?;
1540        let updated = update_task(&self.paths.json, |metadata| {
1541            metadata.completion_delivered = delivered;
1542        })
1543        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1544        state.metadata = updated;
1545        Ok(())
1546    }
1547}
1548
1549fn terminal_metadata_from_marker(
1550    mut metadata: PersistedTask,
1551    marker: ExitMarker,
1552    reason: Option<String>,
1553) -> PersistedTask {
1554    match marker {
1555        ExitMarker::Code(code) => {
1556            let status = if code == 0 {
1557                BgTaskStatus::Completed
1558            } else {
1559                BgTaskStatus::Failed
1560            };
1561            metadata.mark_terminal(status, Some(code), reason);
1562        }
1563        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1564    }
1565    metadata
1566}
1567
1568#[cfg(unix)]
1569fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1570    let shell = resolve_posix_shell();
1571    let mut cmd = Command::new(&shell);
1572    cmd.arg("-c")
1573        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1574        .arg(&shell)
1575        .arg(command)
1576        .arg(exit_path);
1577    unsafe {
1578        cmd.pre_exec(|| {
1579            if libc::setsid() == -1 {
1580                return Err(std::io::Error::last_os_error());
1581            }
1582            Ok(())
1583        });
1584    }
1585    cmd
1586}
1587
1588#[cfg(unix)]
1589fn resolve_posix_shell() -> PathBuf {
1590    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
1591    POSIX_SHELL
1592        .get_or_init(|| {
1593            std::env::var_os("BASH")
1594                .filter(|value| !value.is_empty())
1595                .map(PathBuf::from)
1596                .filter(|path| path.exists())
1597                .or_else(|| which::which("bash").ok())
1598                .or_else(|| which::which("zsh").ok())
1599                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
1600        })
1601        .clone()
1602}
1603
1604#[cfg(windows)]
1605fn detached_shell_command_for(
1606    shell: crate::windows_shell::WindowsShell,
1607    command: &str,
1608    exit_path: &Path,
1609    paths: &TaskPaths,
1610    creation_flags: u32,
1611) -> Result<Command, String> {
1612    use crate::windows_shell::WindowsShell;
1613    // Write the wrapper to a temp file alongside the other task files,
1614    // then invoke the shell with the file path as a single clean
1615    // argument. This sidesteps the entire Windows command-line quoting
1616    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
1617    // parser all interacting with embedded quotes in the wrapper).
1618    //
1619    // Path arguments don't need quoting in the same problematic way
1620    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
1621    // contains no characters that need shell escaping; (2) the wrapper
1622    // body's internal quotes never reach the shell command line — the
1623    // shell reads them from disk by file syntax rules, not command-line
1624    // parser rules.
1625    let wrapper_body = shell.wrapper_script(command, exit_path);
1626    let wrapper_ext = match shell {
1627        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1628        WindowsShell::Cmd => "bat",
1629        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
1630        // so the file extension is purely cosmetic; `.sh` matches what an
1631        // operator would expect when grepping the spill directory.
1632        WindowsShell::Posix(_) => "sh",
1633    };
1634    let wrapper_path = paths.dir.join(format!(
1635        "{}.{}",
1636        paths
1637            .json
1638            .file_stem()
1639            .and_then(|s| s.to_str())
1640            .unwrap_or("wrapper"),
1641        wrapper_ext
1642    ));
1643    fs::write(&wrapper_path, wrapper_body)
1644        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1645
1646    let mut cmd = Command::new(shell.binary().as_ref());
1647    match shell {
1648        WindowsShell::Pwsh | WindowsShell::Powershell => {
1649            // -File runs the script with no quoting issues. `-NoLogo`,
1650            // `-NoProfile`, etc. apply to the host before the file runs.
1651            cmd.args([
1652                "-NoLogo",
1653                "-NoProfile",
1654                "-NonInteractive",
1655                "-ExecutionPolicy",
1656                "Bypass",
1657                "-File",
1658            ]);
1659            cmd.arg(&wrapper_path);
1660        }
1661        WindowsShell::Cmd => {
1662            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
1663            // file via /C is well-defined; the file's contents are
1664            // read line-by-line by cmd's batch processor, NOT
1665            // re-interpreted by the /C parser. This avoids the
1666            // "filename syntax incorrect" errors that came from
1667            // having complex compound commands on the cmd line.
1668            cmd.args(["/D", "/C"]);
1669            cmd.arg(&wrapper_path);
1670        }
1671        WindowsShell::Posix(_) => {
1672            // git-bash and other POSIX shells run the wrapper script with
1673            // `<binary> <wrapper-path>` (the wrapper is just a shell
1674            // script). No special flags needed — the `trap` and atomic
1675            // exit-marker rename in `wrapper_script` are POSIX-standard.
1676            cmd.arg(&wrapper_path);
1677        }
1678    }
1679
1680    // Win32 process creation flags. Caller selects whether to include
1681    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
1682    // for the breakaway-fallback strategy.
1683    cmd.creation_flags(creation_flags);
1684    Ok(cmd)
1685}
1686
1687/// Spawn a detached background bash child process.
1688///
1689/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
1690/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
1691/// cmd.exe) and retries with the next candidate when the previous one
1692/// fails to spawn with `NotFound` — the same runtime safety net the
1693/// foreground bash path has, so issue #27 callers landing on cmd.exe
1694/// fallback can also use background bash. The wrapper script is
1695/// regenerated per attempt because PowerShell wrappers embed the shell
1696/// binary by name; the stdout/stderr capture handles are also reopened
1697/// per attempt because `Command::spawn()` consumes them.
1698///
1699/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
1700/// return immediately without retry — they indicate a problem with the
1701/// resolved shell that retrying with a different shell won't fix.
1702fn spawn_detached_child(
1703    command: &str,
1704    paths: &TaskPaths,
1705    workdir: &Path,
1706    env: &HashMap<String, String>,
1707) -> Result<std::process::Child, String> {
1708    #[cfg(not(windows))]
1709    {
1710        let stdout = create_capture_file(&paths.stdout)
1711            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1712        let stderr = create_capture_file(&paths.stderr)
1713            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1714        detached_shell_command(command, &paths.exit)
1715            .current_dir(workdir)
1716            .envs(env)
1717            .stdin(Stdio::null())
1718            .stdout(Stdio::from(stdout))
1719            .stderr(Stdio::from(stderr))
1720            .spawn()
1721            .map_err(|e| format!("failed to spawn background bash command: {e}"))
1722    }
1723    #[cfg(windows)]
1724    {
1725        use crate::windows_shell::shell_candidates;
1726        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
1727        // legacy foreground bash spawn path. v0.20 routes ALL bash through
1728        // this background spawn helper, including foreground tool calls
1729        // where the model writes PowerShell-syntax (`$var = ...`,
1730        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
1731        // The earlier v0.18-era cmd-first override worked around a
1732        // PowerShell detached-output bug; that bug is fixed at the
1733        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
1734        // see flag block below), so we no longer need to misroute PS
1735        // commands through cmd.
1736        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1737        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
1738        // first (so the bg child outlives the AFT process when AFT is killed),
1739        // then fall back without it for environments where the parent is in a
1740        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
1741        // runners (GitHub Actions windows-2022) and some MDM-managed corp
1742        // environments hit this — `CreateProcess` returns Access Denied (5).
1743        // Without breakaway, the child still runs detached but will be torn
1744        // down with the parent if the parent process group is signaled.
1745        //
1746        // We use CREATE_NO_WINDOW (no visible console window, but the
1747        // child still has a hidden console) rather than DETACHED_PROCESS
1748        // (no console at all). PowerShell-based wrappers that perform
1749        // file I/O via [System.IO.File] need a console handle to flush
1750        // stdout/stderr correctly even when redirected — under
1751        // DETACHED_PROCESS, pwsh sometimes silently exits before
1752        // executing later script statements (the Move-Item that writes
1753        // the exit marker never runs), leaving the bg task forever
1754        // marked Failed: process exited without exit marker. cmd.exe
1755        // wrappers tolerate DETACHED_PROCESS, but switching to
1756        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
1757        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1758        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1759        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1760        let with_breakaway =
1761            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1762        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1763        let mut last_error: Option<String> = None;
1764        for (idx, shell) in candidates.iter().enumerate() {
1765            // Per-shell, try with breakaway first. If the process is in a
1766            // restrictive job, the breakaway flag triggers Access Denied
1767            // (os error 5). Retry once without breakaway.
1768            for &flags in &[with_breakaway, without_breakaway] {
1769                // Re-open capture handles per attempt; spawn() consumes them.
1770                let stdout = create_capture_file(&paths.stdout)
1771                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1772                let stderr = create_capture_file(&paths.stderr)
1773                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1774                let mut cmd =
1775                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1776                cmd.current_dir(workdir)
1777                    .envs(env)
1778                    .stdin(Stdio::null())
1779                    .stdout(Stdio::from(stdout))
1780                    .stderr(Stdio::from(stderr));
1781                match cmd.spawn() {
1782                    Ok(child) => {
1783                        if idx > 0 {
1784                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1785                             the cached PATH probe disagreed with runtime spawn — likely PATH \
1786                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1787                            shell.binary(),
1788                            idx);
1789                        }
1790                        if flags == without_breakaway {
1791                            crate::slog_warn!(
1792                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1793                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
1794                             Spawned without breakaway; the bg task will be torn down if the \
1795                             AFT process group is killed."
1796                            );
1797                        }
1798                        return Ok(child);
1799                    }
1800                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1801                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
1802                        shell.binary());
1803                        last_error = Some(format!("{}: {e}", shell.binary()));
1804                        // Skip the without-breakaway retry for NotFound — the
1805                        // binary itself is missing, breakaway flag is irrelevant.
1806                        break;
1807                    }
1808                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1809                        // Access Denied during breakaway — retry without it.
1810                        crate::slog_warn!(
1811                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1812                         Access Denied — retrying {} without breakaway",
1813                            shell.binary()
1814                        );
1815                        last_error = Some(format!("{}: {e}", shell.binary()));
1816                        continue;
1817                    }
1818                    Err(e) => {
1819                        return Err(format!(
1820                            "failed to spawn background bash command via {}: {e}",
1821                            shell.binary()
1822                        ));
1823                    }
1824                }
1825            }
1826        }
1827        Err(format!(
1828            "failed to spawn background bash command: no Windows shell could be spawned. \
1829             Last error: {}. PATH-probed candidates: {:?}",
1830            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1831            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1832        ))
1833    }
1834}
1835
1836fn random_slug() -> String {
1837    let mut bytes = [0u8; 4];
1838    // getrandom is a transitive dependency; use it directly for OS entropy.
1839    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1840        // Extremely unlikely fallback: time + pid mix.
1841        let t = SystemTime::now()
1842            .duration_since(UNIX_EPOCH)
1843            .map(|d| d.subsec_nanos())
1844            .unwrap_or(0);
1845        let p = std::process::id();
1846        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1847    });
1848    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
1849    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1850    format!("bash-{hex}")
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855    use std::collections::HashMap;
1856    #[cfg(windows)]
1857    use std::fs;
1858    use std::sync::{Arc, Mutex};
1859    use std::time::Duration;
1860    #[cfg(windows)]
1861    use std::time::Instant;
1862
1863    use super::*;
1864
1865    #[cfg(unix)]
1866    const QUICK_SUCCESS_COMMAND: &str = "true";
1867    #[cfg(windows)]
1868    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1869
1870    #[cfg(unix)]
1871    const LONG_RUNNING_COMMAND: &str = "sleep 5";
1872    #[cfg(windows)]
1873    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1874
1875    /// Spawn a child process that exits immediately and return it after
1876    /// it has terminated. Used by reap_child tests to simulate the
1877    /// "child exists and is dead" state when the watchdog has already
1878    /// nulled out the original child handle.
1879    fn spawn_dead_child() -> std::process::Child {
1880        #[cfg(unix)]
1881        let mut cmd = std::process::Command::new("true");
1882        #[cfg(windows)]
1883        let mut cmd = {
1884            let mut c = std::process::Command::new("cmd");
1885            c.args(["/c", "exit", "0"]);
1886            c
1887        };
1888        cmd.stdin(std::process::Stdio::null());
1889        cmd.stdout(std::process::Stdio::null());
1890        cmd.stderr(std::process::Stdio::null());
1891        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
1892        // Poll try_wait() until the child actually exits, instead of calling
1893        // wait() which closes the OS handle. On Windows, after wait()
1894        // closes the handle, subsequent try_wait() calls (which reap_child
1895        // depends on) return Err — the test was inadvertently giving
1896        // reap_child an unusable child handle. Polling try_wait() keeps the
1897        // handle open and observes natural exit, matching the production
1898        // shape where the watchdog discovers an exited child for the first
1899        // time.
1900        let started = Instant::now();
1901        loop {
1902            match child.try_wait() {
1903                Ok(Some(_)) => break,
1904                Ok(None) => {
1905                    if started.elapsed() > Duration::from_secs(5) {
1906                        panic!("dead-child stand-in did not exit within 5s");
1907                    }
1908                    std::thread::sleep(Duration::from_millis(10));
1909                }
1910                Err(error) => panic!("dead-child try_wait failed: {error}"),
1911            }
1912        }
1913        child
1914    }
1915
1916    #[test]
1917    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1918        let registry = BgTaskRegistry::default();
1919        let dir = tempfile::tempdir().unwrap();
1920        let task_id = registry
1921            .spawn(
1922                QUICK_SUCCESS_COMMAND,
1923                "session".to_string(),
1924                dir.path().to_path_buf(),
1925                HashMap::new(),
1926                Some(Duration::from_secs(30)),
1927                dir.path().to_path_buf(),
1928                10,
1929                true,
1930                false,
1931                Some(dir.path().to_path_buf()),
1932            )
1933            .unwrap();
1934        registry
1935            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1936            .unwrap();
1937        let completions = registry.drain_completions_for_session(Some("session"));
1938        assert_eq!(completions.len(), 1);
1939
1940        registry.cleanup_finished(Duration::ZERO);
1941
1942        assert!(registry.inner.tasks.lock().unwrap().is_empty());
1943    }
1944
1945    #[test]
1946    fn cleanup_finished_retains_undelivered_terminals() {
1947        let registry = BgTaskRegistry::default();
1948        let dir = tempfile::tempdir().unwrap();
1949        let task_id = registry
1950            .spawn(
1951                QUICK_SUCCESS_COMMAND,
1952                "session".to_string(),
1953                dir.path().to_path_buf(),
1954                HashMap::new(),
1955                Some(Duration::from_secs(30)),
1956                dir.path().to_path_buf(),
1957                10,
1958                true,
1959                false,
1960                Some(dir.path().to_path_buf()),
1961            )
1962            .unwrap();
1963        registry
1964            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1965            .unwrap();
1966
1967        registry.cleanup_finished(Duration::ZERO);
1968
1969        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1970    }
1971
1972    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
1973    /// watchdog path (reap_child) marks a task Failed when the child
1974    /// has exited but no exit marker was written. Before this fix the
1975    /// task would remain `Running` until timeout, even though the
1976    /// process was definitely dead.
1977    ///
1978    /// Cross-platform: uses a quick-exiting command that does NOT go
1979    /// through the wrapper script (we manually clear the exit marker
1980    /// after spawn to simulate the wrapper crashing before write).
1981    #[test]
1982    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1983        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1984        let dir = tempfile::tempdir().unwrap();
1985        let task_id = registry
1986            .spawn(
1987                QUICK_SUCCESS_COMMAND,
1988                "session".to_string(),
1989                dir.path().to_path_buf(),
1990                HashMap::new(),
1991                Some(Duration::from_secs(30)),
1992                dir.path().to_path_buf(),
1993                10,
1994                true,
1995                false,
1996                Some(dir.path().to_path_buf()),
1997            )
1998            .unwrap();
1999
2000        let task = registry.task_for_session(&task_id, "session").unwrap();
2001
2002        // Wait for the child to actually exit and the wrapper to either
2003        // write the marker or fail. Then nuke the marker to simulate
2004        // wrapper crash before write. Poll up to 5s; this is plenty for a
2005        // `true`/`cmd /c exit 0` invocation.
2006        let started = Instant::now();
2007        loop {
2008            let exited = {
2009                let mut state = task.state.lock().unwrap();
2010                if let Some(child) = state.child.as_mut() {
2011                    matches!(child.try_wait(), Ok(Some(_)))
2012                } else {
2013                    true
2014                }
2015            };
2016            if exited {
2017                break;
2018            }
2019            assert!(
2020                started.elapsed() < Duration::from_secs(5),
2021                "child should exit quickly"
2022            );
2023            std::thread::sleep(Duration::from_millis(20));
2024        }
2025
2026        // Stop the watchdog so it doesn't race with our manual reap_child.
2027        // On fast Windows runners the watchdog ticks (every 500ms) can
2028        // observe the child exit and reap it before this test's assertion
2029        // fires, leaving us with state.child = None and an already-terminal
2030        // status. We specifically want to test reap_child's logic when
2031        // invoked manually on a Running-but-actually-dead task, so we need
2032        // exclusive control over the reap path here.
2033        registry
2034            .inner
2035            .shutdown
2036            .store(true, std::sync::atomic::Ordering::SeqCst);
2037        // Give the watchdog at most one tick (500ms) to notice shutdown
2038        // before we touch task state. Without this, an in-flight watchdog
2039        // iteration could still race with our state setup below.
2040        std::thread::sleep(Duration::from_millis(550));
2041
2042        // Wrapper likely wrote the marker by now; remove it to simulate
2043        // a wrapper crash that exited before persisting the exit code.
2044        let _ = std::fs::remove_file(&task.paths.exit);
2045
2046        // The watchdog may have already reaped the child handle and
2047        // marked the task terminal before we got here. Reset both so
2048        // reap_child has the "Running task whose child just exited"
2049        // shape it's designed to handle. We don't restore state.child
2050        // (the underlying OS process is gone), but reap_child's
2051        // try_wait path won't be exercised; we're testing the
2052        // status-transition logic when state.child is set to a dead
2053        // child OR None and the marker is missing.
2054        //
2055        // CRITICAL on Windows: the watchdog ticks fast enough that the
2056        // JSON on disk may already say `Completed`. `update_task` (called
2057        // by `reap_child`) reads from disk, applies the closure, but
2058        // ROLLS BACK if the original on-disk state was already terminal
2059        // (see persistence.rs::update_task). So we must reset BOTH
2060        // in-memory metadata AND the JSON on disk to a Running state to
2061        // give reap_child the fresh shape it expects to operate on.
2062        {
2063            let mut state = task.state.lock().unwrap();
2064            state.metadata.status = BgTaskStatus::Running;
2065            state.metadata.status_reason = None;
2066            state.metadata.exit_code = None;
2067            state.metadata.finished_at = None;
2068            state.metadata.duration_ms = None;
2069            // Persist the reset state to disk so update_task's terminal
2070            // rollback guard sees a non-terminal starting point.
2071            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2072                .expect("persist reset Running metadata for reap_child test");
2073            // If the watchdog already nulled state.child, we need to
2074            // simulate "child exists and is dead" so reap_child's
2075            // try_wait path runs. Spawn a quick-exit child as a stand-in.
2076            if state.child.is_none() {
2077                state.child = Some(spawn_dead_child());
2078            }
2079        }
2080        // Clear the terminal_at marker too so mark_terminal_now() can fire
2081        // again inside reap_child.
2082        *task.terminal_at.lock().unwrap() = None;
2083
2084        // Sanity: task is still Running per metadata (replay/poll hasn't
2085        // observed the missing marker yet).
2086        assert!(
2087            task.is_running(),
2088            "precondition: metadata.status == Running"
2089        );
2090        assert!(
2091            !task.paths.exit.exists(),
2092            "precondition: exit marker absent"
2093        );
2094
2095        // Invoke the watchdog's reap_child directly. The fix should mark
2096        // the task Failed with the documented reason string, instead of
2097        // just dropping the child handle and leaving status=Running.
2098        registry.reap_child(&task);
2099
2100        let state = task.state.lock().unwrap();
2101        assert!(
2102            state.metadata.status.is_terminal(),
2103            "reap_child must transition to terminal when PID dead and no marker. \
2104             Got status={:?}",
2105            state.metadata.status
2106        );
2107        assert_eq!(
2108            state.metadata.status,
2109            BgTaskStatus::Failed,
2110            "must specifically be Failed (not Killed): status={:?}",
2111            state.metadata.status
2112        );
2113        assert_eq!(
2114            state.metadata.status_reason.as_deref(),
2115            Some("process exited without exit marker"),
2116            "reason must match replay path's wording: {:?}",
2117            state.metadata.status_reason
2118        );
2119        assert!(
2120            state.child.is_none(),
2121            "child handle must be released after reap"
2122        );
2123        assert!(state.detached, "task must be marked detached after reap");
2124    }
2125
2126    /// Companion to the above: when the exit marker DOES exist on disk
2127    /// at reap_child time (race window — wrapper finished writing
2128    /// between try_wait and the marker check), reap_child must NOT mark
2129    /// the task Failed. Instead it leaves status=Running and lets the
2130    /// next poll_task() cycle finalize via the marker.
2131    #[test]
2132    fn reap_child_preserves_running_when_exit_marker_exists() {
2133        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2134        let dir = tempfile::tempdir().unwrap();
2135        let task_id = registry
2136            .spawn(
2137                QUICK_SUCCESS_COMMAND,
2138                "session".to_string(),
2139                dir.path().to_path_buf(),
2140                HashMap::new(),
2141                Some(Duration::from_secs(30)),
2142                dir.path().to_path_buf(),
2143                10,
2144                true,
2145                false,
2146                Some(dir.path().to_path_buf()),
2147            )
2148            .unwrap();
2149
2150        let task = registry.task_for_session(&task_id, "session").unwrap();
2151
2152        // Wait for child to exit AND for the marker to land. Both happen
2153        // shortly after the wrapper finishes — but we want both observed.
2154        let started = Instant::now();
2155        loop {
2156            let exited = {
2157                let mut state = task.state.lock().unwrap();
2158                if let Some(child) = state.child.as_mut() {
2159                    matches!(child.try_wait(), Ok(Some(_)))
2160                } else {
2161                    true
2162                }
2163            };
2164            if exited && task.paths.exit.exists() {
2165                break;
2166            }
2167            assert!(
2168                started.elapsed() < Duration::from_secs(5),
2169                "child should exit and write marker quickly"
2170            );
2171            std::thread::sleep(Duration::from_millis(20));
2172        }
2173
2174        // Stop the watchdog so it doesn't race with our manual reap_child.
2175        // On fast Windows runners the watchdog can call poll_task (which
2176        // finalizes via marker) before this test asserts the
2177        // "marker exists, status still Running" invariant. We want
2178        // exclusive control over the reap path.
2179        registry
2180            .inner
2181            .shutdown
2182            .store(true, std::sync::atomic::Ordering::SeqCst);
2183        std::thread::sleep(Duration::from_millis(550));
2184
2185        // If the watchdog already finalized the task before we stopped it,
2186        // restore the test setup: reset status to Running and ensure the
2187        // marker file is still on disk. We're testing reap_child's
2188        // behavior when called manually with both child-exited AND
2189        // marker-present, regardless of whether the watchdog beat us.
2190        {
2191            let mut state = task.state.lock().unwrap();
2192            state.metadata.status = BgTaskStatus::Running;
2193            state.metadata.status_reason = None;
2194            if state.child.is_none() {
2195                state.child = Some(spawn_dead_child());
2196            }
2197        }
2198        *task.terminal_at.lock().unwrap() = None;
2199        // Make sure the marker is still on disk (poll_task removes it on
2200        // finalization). Recreate it if needed.
2201        if !task.paths.exit.exists() {
2202            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2203        }
2204
2205        // reap_child sees: child exited, marker exists. It should:
2206        //  - drop state.child / set state.detached = true
2207        //  - NOT change status (poll_task will finalize via marker next tick)
2208        registry.reap_child(&task);
2209
2210        let state = task.state.lock().unwrap();
2211        assert!(
2212            state.child.is_none(),
2213            "child handle still released even when marker exists"
2214        );
2215        assert!(
2216            state.detached,
2217            "task still marked detached even when marker exists"
2218        );
2219        // Status remains Running because reap_child defers to poll_task
2220        // when a marker exists. It would be wrong for reap to record the
2221        // marker outcome (poll_task does that with proper exit-code
2222        // parsing).
2223        assert_eq!(
2224            state.metadata.status,
2225            BgTaskStatus::Running,
2226            "reap_child must defer to poll_task when marker exists"
2227        );
2228    }
2229
2230    #[test]
2231    fn cleanup_finished_keeps_running_tasks() {
2232        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2233        let dir = tempfile::tempdir().unwrap();
2234        let task_id = registry
2235            .spawn(
2236                LONG_RUNNING_COMMAND,
2237                "session".to_string(),
2238                dir.path().to_path_buf(),
2239                HashMap::new(),
2240                Some(Duration::from_secs(30)),
2241                dir.path().to_path_buf(),
2242                10,
2243                true,
2244                false,
2245                Some(dir.path().to_path_buf()),
2246            )
2247            .unwrap();
2248
2249        registry.cleanup_finished(Duration::ZERO);
2250
2251        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2252        let _ = registry.kill(&task_id, "session");
2253    }
2254
2255    #[cfg(windows)]
2256    fn wait_for_file(path: &Path) -> String {
2257        let started = Instant::now();
2258        loop {
2259            if path.exists() {
2260                return fs::read_to_string(path).expect("read file");
2261            }
2262            assert!(
2263                started.elapsed() < Duration::from_secs(30),
2264                "timed out waiting for {}",
2265                path.display()
2266            );
2267            std::thread::sleep(Duration::from_millis(100));
2268        }
2269    }
2270
2271    #[cfg(windows)]
2272    fn spawn_windows_registry_command(
2273        command: &str,
2274    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2275        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2276        let dir = tempfile::tempdir().unwrap();
2277        let task_id = registry
2278            .spawn(
2279                command,
2280                "session".to_string(),
2281                dir.path().to_path_buf(),
2282                HashMap::new(),
2283                Some(Duration::from_secs(30)),
2284                dir.path().to_path_buf(),
2285                10,
2286                false,
2287                false,
2288                Some(dir.path().to_path_buf()),
2289            )
2290            .unwrap();
2291        (registry, dir, task_id)
2292    }
2293
2294    #[cfg(windows)]
2295    #[test]
2296    fn windows_spawn_writes_exit_marker_for_zero_exit() {
2297        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2298        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2299
2300        let content = wait_for_file(&exit_path);
2301
2302        assert_eq!(content.trim(), "0");
2303    }
2304
2305    #[cfg(windows)]
2306    #[test]
2307    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2308        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2309        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2310
2311        let content = wait_for_file(&exit_path);
2312
2313        assert_eq!(content.trim(), "42");
2314    }
2315
2316    #[cfg(windows)]
2317    #[test]
2318    fn windows_spawn_captures_stdout_to_disk() {
2319        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2320        let task = registry.task_for_session(&task_id, "session").unwrap();
2321        let stdout_path = task.paths.stdout.clone();
2322        let exit_path = task.paths.exit.clone();
2323
2324        let _ = wait_for_file(&exit_path);
2325        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2326
2327        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2328    }
2329
2330    #[cfg(windows)]
2331    #[test]
2332    fn windows_spawn_uses_pwsh_when_available() {
2333        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
2334        // (We intentionally pass None for shell_env to keep this test
2335        // independent of the runner's actual env.)
2336        let candidates = crate::windows_shell::shell_candidates_with(
2337            |binary| match binary {
2338                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2339                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2340                _ => None,
2341            },
2342            || None,
2343        );
2344        let shell = candidates.first().expect("at least one candidate").clone();
2345        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2346        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2347    }
2348
2349    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
2350    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
2351    /// evaluation is the default for batch files; parse-time expansion only
2352    /// applies to compound `&`-chained inline commands). Capturing
2353    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
2354    /// command runs records the real run-time exit code.
2355    #[cfg(windows)]
2356    #[test]
2357    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2358        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2359        let script =
2360            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2361
2362        // Batch wrapper: capture exit code into CODE on the line after the
2363        // user command, then write CODE to a temp marker file before
2364        // atomic-renaming it into place.
2365        assert!(
2366            script.contains("set CODE=%ERRORLEVEL%"),
2367            "wrapper must capture exit code into CODE: {script}"
2368        );
2369        assert!(
2370            script.contains("echo %CODE% >"),
2371            "wrapper must echo CODE to a temp marker file: {script}"
2372        );
2373        assert!(
2374            script.contains("move /Y"),
2375            "wrapper must use atomic move to write the marker: {script}"
2376        );
2377        // move output must be redirected to nul to avoid polluting the
2378        // user's captured stdout with "1 file(s) moved." lines.
2379        assert!(
2380            script.contains("> nul"),
2381            "wrapper must redirect move output to nul: {script}"
2382        );
2383        // exit /B %CODE% propagates the real exit code so wait() sees it.
2384        assert!(
2385            script.contains("exit /B %CODE%"),
2386            "wrapper must propagate the captured exit code: {script}"
2387        );
2388        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
2389        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
2390    }
2391
2392    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
2393    /// written to a `.bat` file where batch-line evaluation captures
2394    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
2395    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
2396    /// internal `"`-quoting from `cmd_quote`).
2397    #[cfg(windows)]
2398    #[test]
2399    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
2400        use crate::windows_shell::WindowsShell;
2401        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
2402        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2403        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2404        assert_eq!(
2405            args_strs,
2406            vec!["/D", "/S", "/C", "echo wrapped"],
2407            "Cmd::bg_command must prepend /D /S /C"
2408        );
2409    }
2410
2411    /// PowerShell variants don't need `/V:ON`-style flags; their args
2412    /// are the same for foreground (`command()`) and background
2413    /// (`bg_command()`).
2414    #[cfg(windows)]
2415    #[test]
2416    fn windows_shell_pwsh_bg_command_uses_standard_args() {
2417        use crate::windows_shell::WindowsShell;
2418        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
2419        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2420        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2421        assert!(
2422            args_strs.contains(&"-Command"),
2423            "Pwsh::bg_command must use -Command: {args_strs:?}"
2424        );
2425        assert!(
2426            args_strs.contains(&"Get-Date"),
2427            "Pwsh::bg_command must include the user command body"
2428        );
2429    }
2430
2431    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
2432    /// **cmd.exe-specific** wrapper path captures the user command's
2433    /// run-time exit code correctly. The existing
2434    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
2435    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
2436    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
2437    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
2438    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
2439    ///
2440    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
2441    /// force the cmd-wrapper code path, then writes the exit marker and
2442    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
2443    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
2444    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
2445    /// regardless of what the user command returned.
2446    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
2447    /// the inline command-line wrapper helper that production code does
2448    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
2449    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
2450    /// produced silent failures on Windows 11 (move /Y could not parse
2451    /// path arguments through cmd's /C parser). The `bg_command` helper
2452    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
2453    /// the production spawn path goes through `detached_shell_command_for`
2454    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
2455    /// <bat-path>`.
2456    ///
2457    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
2458    /// covered live by the Windows e2e harness scenario 2d
2459    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
2460    /// exercises the real file-based wrapper end-to-end via the protocol.
2461    #[allow(dead_code)]
2462    #[cfg(any())] // disabled on all targets
2463    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
2464}