Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use rusqlite::Connection;
12use serde::Serialize;
13
14use crate::context::SharedProgressSender;
15use crate::harness::Harness;
16use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
17
18#[cfg(unix)]
19use std::os::unix::process::CommandExt;
20#[cfg(windows)]
21use std::os::windows::process::CommandExt;
22
23use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
24use super::persistence::{
25    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
26    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
27    PersistedTask, TaskPaths,
28};
29use super::process::is_process_alive;
30#[cfg(unix)]
31use super::process::terminate_pgid;
32#[cfg(windows)]
33use super::process::terminate_pid;
34use super::{BgTaskInfo, BgTaskStatus};
35// Note: `resolve_windows_shell` is no longer imported at module scope —
36// production code in `spawn_detached_child` uses `shell_candidates()`
37// with retry instead, and the function remains in `windows_shell.rs`
38// for tests and as a future helper.
39
40/// Default timeout for background bash tasks: 30 minutes.
41/// Agents can override per-call via the `timeout` parameter (in ms).
42const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
43const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
44const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
45const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
46
47/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
48/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
49/// of typical command output (git status, test results, exit messages) — short
50/// enough that round-tripping multiple completions in one reminder stays well
51/// under the model's context budget but long enough that most successful runs
52/// don't need a follow-up `bash_status` call.
53const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
54const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct BgCompletion {
58    pub task_id: String,
59    /// Intentionally omitted from serialized completion payloads: push frames
60    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
61    #[serde(skip_serializing)]
62    pub session_id: String,
63    pub status: BgTaskStatus,
64    pub exit_code: Option<i32>,
65    pub command: String,
66    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
67    /// cached so push-frame consumers and `bash_drain_completions` callers see
68    /// the same preview without racing against later output rotation. Empty
69    /// when not captured (e.g., persisted task seen on startup before buffer
70    /// reattachment).
71    #[serde(default, skip_serializing_if = "String::is_empty")]
72    pub output_preview: String,
73    /// True when the captured tail is shorter than the actual output (because
74    /// rotation occurred or the output exceeds the preview cap). Plugins use
75    /// this to render a `…` prefix and signal that `bash_status` would return
76    /// more.
77    #[serde(default, skip_serializing_if = "is_false")]
78    pub output_truncated: bool,
79    /// Token count for raw stdout+stderr before compression. Omitted when any
80    /// stream exceeds the 128 KiB tokenization cap.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub original_tokens: Option<u32>,
83    /// Token count for the compressed output generated from the same capped
84    /// raw payload. Omitted when raw tokenization is skipped.
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub compressed_tokens: Option<u32>,
87    /// True when a stream exceeded the tokenization cap and counts are absent.
88    #[serde(default, skip_serializing_if = "is_false")]
89    pub tokens_skipped: bool,
90}
91
92fn is_false(v: &bool) -> bool {
93    !*v
94}
95
96#[derive(Debug, Clone, Serialize)]
97pub struct BgTaskSnapshot {
98    #[serde(flatten)]
99    pub info: BgTaskInfo,
100    pub exit_code: Option<i32>,
101    pub child_pid: Option<u32>,
102    pub workdir: String,
103    pub output_preview: String,
104    pub output_truncated: bool,
105    pub output_path: Option<String>,
106    pub stderr_path: Option<String>,
107}
108
109#[derive(Clone)]
110pub struct BgTaskRegistry {
111    pub(crate) inner: Arc<RegistryInner>,
112}
113
114pub(crate) struct RegistryInner {
115    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
116    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
117    pub(crate) progress_sender: SharedProgressSender,
118    watchdog_started: AtomicBool,
119    pub(crate) shutdown: AtomicBool,
120    pub(crate) long_running_reminder_enabled: AtomicBool,
121    pub(crate) long_running_reminder_interval_ms: AtomicU64,
122    persisted_gc_started: AtomicBool,
123    #[cfg(test)]
124    persisted_gc_runs: AtomicU64,
125    /// Output compression callback. Set by `AppContext` after construction.
126    /// Takes (command, raw_output) and returns compressed text. Called from
127    /// the watchdog thread when a task reaches a terminal state and from
128    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
129    /// uncompressed.
130    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
131    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
132    pub(crate) db_harness: RwLock<Option<String>>,
133}
134
135pub(crate) struct BgTask {
136    pub(crate) task_id: String,
137    pub(crate) session_id: String,
138    pub(crate) paths: TaskPaths,
139    pub(crate) started: Instant,
140    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
141    pub(crate) terminal_at: Mutex<Option<Instant>>,
142    pub(crate) state: Mutex<BgTaskState>,
143}
144
145pub(crate) struct BgTaskState {
146    pub(crate) metadata: PersistedTask,
147    pub(crate) child: Option<Child>,
148    pub(crate) detached: bool,
149    pub(crate) buffer: BgBuffer,
150}
151
152impl BgTaskRegistry {
153    pub fn new(progress_sender: SharedProgressSender) -> Self {
154        Self {
155            inner: Arc::new(RegistryInner {
156                tasks: Mutex::new(HashMap::new()),
157                completions: Mutex::new(VecDeque::new()),
158                progress_sender,
159                watchdog_started: AtomicBool::new(false),
160                shutdown: AtomicBool::new(false),
161                long_running_reminder_enabled: AtomicBool::new(true),
162                long_running_reminder_interval_ms: AtomicU64::new(600_000),
163                persisted_gc_started: AtomicBool::new(false),
164                #[cfg(test)]
165                persisted_gc_runs: AtomicU64::new(0),
166                compressor: Mutex::new(None),
167                db_pool: RwLock::new(None),
168                db_harness: RwLock::new(None),
169            }),
170        }
171    }
172
173    pub fn set_harness(&self, harness: Harness) {
174        if let Ok(mut slot) = self.inner.db_harness.write() {
175            *slot = Some(harness.as_str().to_string());
176        }
177    }
178
179    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
180        if let Ok(mut slot) = self.inner.db_pool.write() {
181            *slot = Some(conn);
182        }
183    }
184
185    pub fn clear_db_pool(&self) {
186        if let Ok(mut slot) = self.inner.db_pool.write() {
187            *slot = None;
188        }
189    }
190
191    /// Install the output-compression callback. Called by `main.rs` after
192    /// `AppContext` is constructed so that snapshot/completion paths can
193    /// invoke `compress::compress_with_registry` without holding a context
194    /// reference. When called multiple times, the latest installation wins.
195    pub fn set_compressor<F>(&self, compressor: F)
196    where
197        F: Fn(&str, String) -> String + Send + Sync + 'static,
198    {
199        if let Ok(mut slot) = self.inner.compressor.lock() {
200            *slot = Some(Box::new(compressor));
201        }
202    }
203
204    /// Apply the installed compressor (if any) to `output`. Returns `output`
205    /// untouched when no compressor is installed.
206    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
207        let Ok(slot) = self.inner.compressor.lock() else {
208            return output;
209        };
210        match slot.as_ref() {
211            Some(compressor) => compressor(command, output),
212            None => output,
213        }
214    }
215
216    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
217        write_task(&paths.json, metadata)?;
218        self.dual_write_task(paths, metadata);
219        Ok(())
220    }
221
222    fn update_task_metadata<F>(
223        &self,
224        paths: &TaskPaths,
225        update: F,
226    ) -> std::io::Result<PersistedTask>
227    where
228        F: FnOnce(&mut PersistedTask),
229    {
230        let metadata = update_task(&paths.json, update)?;
231        self.dual_write_task(paths, &metadata);
232        Ok(metadata)
233    }
234
235    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
236        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
237        let Some(pool) = pool else {
238            return;
239        };
240        let harness = self
241            .inner
242            .db_harness
243            .read()
244            .ok()
245            .and_then(|slot| slot.clone());
246        let Some(harness) = harness else {
247            crate::slog_warn!(
248                "dual-write bash_task to DB skipped for {}: harness not configured",
249                metadata.task_id
250            );
251            return;
252        };
253        let row = match metadata.to_bash_task_row(&harness, paths) {
254            Ok(row) => row,
255            Err(error) => {
256                crate::slog_warn!(
257                    "dual-write bash_task to DB failed for {}: {}",
258                    metadata.task_id,
259                    error
260                );
261                return;
262            }
263        };
264        let conn = match pool.lock() {
265            Ok(conn) => conn,
266            Err(_) => {
267                crate::slog_warn!(
268                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
269                    metadata.task_id
270                );
271                return;
272            }
273        };
274        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
275            crate::slog_warn!(
276                "dual-write bash_task to DB failed for {}: {}",
277                metadata.task_id,
278                error
279            );
280        }
281    }
282
283    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
284        self.inner
285            .long_running_reminder_enabled
286            .store(enabled, Ordering::SeqCst);
287        self.inner
288            .long_running_reminder_interval_ms
289            .store(interval_ms, Ordering::SeqCst);
290    }
291
292    #[cfg(unix)]
293    #[allow(clippy::too_many_arguments)]
294    pub fn spawn(
295        &self,
296        command: &str,
297        session_id: String,
298        workdir: PathBuf,
299        env: HashMap<String, String>,
300        timeout: Option<Duration>,
301        storage_dir: PathBuf,
302        max_running: usize,
303        notify_on_completion: bool,
304        compressed: bool,
305        project_root: Option<PathBuf>,
306    ) -> Result<String, String> {
307        self.start_watchdog();
308
309        let running = self.running_count();
310        if running >= max_running {
311            return Err(format!(
312                "background bash task limit exceeded: {running} running (max {max_running})"
313            ));
314        }
315
316        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
317        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
318        let task_id = self.generate_unique_task_id()?;
319        let paths = task_paths(&storage_dir, &session_id, &task_id);
320        fs::create_dir_all(&paths.dir)
321            .map_err(|e| format!("failed to create background task dir: {e}"))?;
322
323        let mut metadata = PersistedTask::starting(
324            task_id.clone(),
325            session_id.clone(),
326            command.to_string(),
327            workdir.clone(),
328            project_root,
329            timeout_ms,
330            notify_on_completion,
331            compressed,
332        );
333        self.persist_task(&paths, &metadata)
334            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
335
336        // Pre-create capture files so the watchdog/buffer can always
337        // open them for reading. The spawn helper opens its own handles
338        // per attempt because each `Command::spawn()` consumes them.
339        create_capture_file(&paths.stdout)
340            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
341        create_capture_file(&paths.stderr)
342            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
343
344        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
345            Ok(child) => child,
346            Err(error) => {
347                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
348                let _ = delete_task_bundle(&paths);
349                return Err(error);
350            }
351        };
352
353        let child_pid = child.id();
354        metadata.mark_running(child_pid, child_pid as i32);
355        self.persist_task(&paths, &metadata)
356            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
357
358        let task = Arc::new(BgTask {
359            task_id: task_id.clone(),
360            session_id,
361            paths: paths.clone(),
362            started: Instant::now(),
363            last_reminder_at: Mutex::new(None),
364            terminal_at: Mutex::new(None),
365            state: Mutex::new(BgTaskState {
366                metadata,
367                child: Some(child),
368                detached: false,
369                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
370            }),
371        });
372
373        self.inner
374            .tasks
375            .lock()
376            .map_err(|_| "background task registry lock poisoned".to_string())?
377            .insert(task_id.clone(), task);
378
379        Ok(task_id)
380    }
381
382    #[cfg(windows)]
383    #[allow(clippy::too_many_arguments)]
384    pub fn spawn(
385        &self,
386        command: &str,
387        session_id: String,
388        workdir: PathBuf,
389        env: HashMap<String, String>,
390        timeout: Option<Duration>,
391        storage_dir: PathBuf,
392        max_running: usize,
393        notify_on_completion: bool,
394        compressed: bool,
395        project_root: Option<PathBuf>,
396    ) -> Result<String, String> {
397        self.start_watchdog();
398
399        let running = self.running_count();
400        if running >= max_running {
401            return Err(format!(
402                "background bash task limit exceeded: {running} running (max {max_running})"
403            ));
404        }
405
406        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
407        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
408        let task_id = self.generate_unique_task_id()?;
409        let paths = task_paths(&storage_dir, &session_id, &task_id);
410        fs::create_dir_all(&paths.dir)
411            .map_err(|e| format!("failed to create background task dir: {e}"))?;
412
413        let mut metadata = PersistedTask::starting(
414            task_id.clone(),
415            session_id.clone(),
416            command.to_string(),
417            workdir.clone(),
418            project_root,
419            timeout_ms,
420            notify_on_completion,
421            compressed,
422        );
423        self.persist_task(&paths, &metadata)
424            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
425
426        // Capture files are pre-created so the watchdog/buffer can always
427        // open them for reading even if the child hasn't written anything
428        // yet. The spawn helper opens its own handles per attempt because
429        // each `Command::spawn()` consumes them, and on Windows we may
430        // retry across multiple shell candidates if the first one fails.
431        create_capture_file(&paths.stdout)
432            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
433        create_capture_file(&paths.stderr)
434            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
435
436        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
437            Ok(child) => child,
438            Err(error) => {
439                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
440                let _ = delete_task_bundle(&paths);
441                return Err(error);
442            }
443        };
444
445        let child_pid = child.id();
446        metadata.status = BgTaskStatus::Running;
447        metadata.child_pid = Some(child_pid);
448        metadata.pgid = None;
449        self.persist_task(&paths, &metadata)
450            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
451
452        let task = Arc::new(BgTask {
453            task_id: task_id.clone(),
454            session_id,
455            paths: paths.clone(),
456            started: Instant::now(),
457            last_reminder_at: Mutex::new(None),
458            terminal_at: Mutex::new(None),
459            state: Mutex::new(BgTaskState {
460                metadata,
461                child: Some(child),
462                detached: false,
463                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
464            }),
465        });
466
467        self.inner
468            .tasks
469            .lock()
470            .map_err(|_| "background task registry lock poisoned".to_string())?
471            .insert(task_id.clone(), task);
472
473        Ok(task_id)
474    }
475
476    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
477        self.replay_session_inner(storage_dir, session_id, None)
478    }
479
480    pub fn replay_session_for_project(
481        &self,
482        storage_dir: &Path,
483        session_id: &str,
484        project_root: &Path,
485    ) -> Result<(), String> {
486        self.replay_session_inner(storage_dir, session_id, Some(project_root))
487    }
488
489    fn replay_session_inner(
490        &self,
491        storage_dir: &Path,
492        session_id: &str,
493        project_root: Option<&Path>,
494    ) -> Result<(), String> {
495        self.start_watchdog();
496        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
497            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
498                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
499            }
500        }
501
502        let canonical_project = project_root.map(canonicalized_path);
503        let tasks = match self.replay_session_from_db(session_id) {
504            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
505            Some(Ok(_)) => {
506                crate::slog_info!(
507                    "bash task replay DB miss for session {}; falling back to disk",
508                    session_id
509                );
510                self.replay_session_from_disk(storage_dir, session_id)?
511            }
512            Some(Err(error)) => {
513                crate::slog_warn!(
514                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
515                    session_id,
516                    error
517                );
518                self.replay_session_from_disk(storage_dir, session_id)?
519            }
520            None => {
521                crate::slog_info!(
522                    "bash task replay DB unavailable for session {}; falling back to disk",
523                    session_id
524                );
525                self.replay_session_from_disk(storage_dir, session_id)?
526            }
527        };
528
529        for mut metadata in tasks {
530            if metadata.session_id != session_id {
531                continue;
532            }
533            if let Some(canonical_project) = canonical_project.as_deref() {
534                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
535                if metadata_project.as_deref() != Some(canonical_project) {
536                    continue;
537                }
538            }
539
540            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
541            match metadata.status {
542                BgTaskStatus::Starting => {
543                    metadata.mark_terminal(
544                        BgTaskStatus::Failed,
545                        None,
546                        Some("spawn aborted".to_string()),
547                    );
548                    let _ = self.persist_task(&paths, &metadata);
549                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
550                    self.insert_rehydrated_task(metadata, paths, true)?;
551                }
552                BgTaskStatus::Running | BgTaskStatus::Killing => {
553                    if self.running_metadata_is_stale(&metadata) {
554                        metadata.mark_terminal(
555                            BgTaskStatus::Killed,
556                            None,
557                            Some("orphaned (>24h)".to_string()),
558                        );
559                        if !paths.exit.exists() {
560                            let _ = write_kill_marker_if_absent(&paths.exit);
561                        }
562                        let _ = self.persist_task(&paths, &metadata);
563                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
564                        self.insert_rehydrated_task(metadata, paths, true)?;
565                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
566                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
567                            "recovered from inconsistent killing state on replay".to_string()
568                        });
569                        if reason.is_some() {
570                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
571                            metadata.task_id);
572                        }
573                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
574                        let _ = self.persist_task(&paths, &metadata);
575                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
576                        self.insert_rehydrated_task(metadata, paths, true)?;
577                    } else if metadata.status == BgTaskStatus::Killing {
578                        if !paths.exit.exists() {
579                            let _ = write_kill_marker_if_absent(&paths.exit);
580                        }
581                        metadata.mark_terminal(
582                            BgTaskStatus::Killed,
583                            None,
584                            Some("recovered from inconsistent killing state on replay".to_string()),
585                        );
586                        let _ = self.persist_task(&paths, &metadata);
587                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
588                        self.insert_rehydrated_task(metadata, paths, true)?;
589                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
590                        metadata.mark_terminal(
591                            BgTaskStatus::Failed,
592                            None,
593                            Some("process exited without exit marker".to_string()),
594                        );
595                        let _ = self.persist_task(&paths, &metadata);
596                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
597                        self.insert_rehydrated_task(metadata, paths, true)?;
598                    } else {
599                        self.insert_rehydrated_task(metadata, paths, true)?;
600                    }
601                }
602                _ if metadata.status.is_terminal() => {
603                    // Borrow `paths` for the completion enqueue BEFORE
604                    // `insert_rehydrated_task` consumes it. The completion
605                    // helper only reads from `paths` (stdout/stderr/exit) to
606                    // reconstruct a tail preview, so it must see the same
607                    // paths the rehydrated task will own.
608                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
609                    self.insert_rehydrated_task(metadata, paths, true)?;
610                }
611                _ => {}
612            }
613        }
614
615        Ok(())
616    }
617
618    fn replay_session_from_db(
619        &self,
620        session_id: &str,
621    ) -> Option<Result<Vec<PersistedTask>, String>> {
622        let pool = self
623            .inner
624            .db_pool
625            .read()
626            .ok()
627            .and_then(|slot| slot.clone())?;
628        let harness = self
629            .inner
630            .db_harness
631            .read()
632            .ok()
633            .and_then(|slot| slot.clone())?;
634        let conn = match pool.lock() {
635            Ok(conn) => conn,
636            Err(_) => return Some(Err("db mutex poisoned".to_string())),
637        };
638        Some(
639            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
640                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
641                .map_err(|error| error.to_string()),
642        )
643    }
644
645    fn replay_session_from_disk(
646        &self,
647        storage_dir: &Path,
648        session_id: &str,
649    ) -> Result<Vec<PersistedTask>, String> {
650        let dir = session_tasks_dir(storage_dir, session_id);
651        if !dir.exists() {
652            return Ok(Vec::new());
653        }
654
655        let entries = fs::read_dir(&dir)
656            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
657        let mut tasks = Vec::new();
658        for entry in entries.flatten() {
659            let path = entry.path();
660            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
661                continue;
662            }
663            match read_task(&path) {
664                Ok(metadata) => tasks.push(metadata),
665                Err(error) => {
666                    crate::slog_warn!(
667                        "quarantining invalid background task metadata {} during replay: {error}",
668                        path.display()
669                    );
670                    if let Err(quarantine_error) =
671                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
672                    {
673                        crate::slog_warn!(
674                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
675                            path.display()
676                        );
677                    }
678                }
679            }
680        }
681        Ok(tasks)
682    }
683
684    pub fn status(
685        &self,
686        task_id: &str,
687        session_id: &str,
688        project_root: Option<&Path>,
689        storage_dir: Option<&Path>,
690        preview_bytes: usize,
691    ) -> Option<BgTaskSnapshot> {
692        let mut task = self.task_for_session(task_id, session_id);
693        if task.is_none() {
694            if let Some(storage_dir) = storage_dir {
695                let _ = self.replay_session(storage_dir, session_id);
696                task = self.task_for_session(task_id, session_id);
697            }
698        }
699        let Some(task) = task else {
700            return self.status_relaxed(
701                task_id,
702                session_id,
703                project_root?,
704                storage_dir?,
705                preview_bytes,
706            );
707        };
708        let _ = self.poll_task(&task);
709        let mut snapshot = task.snapshot(preview_bytes);
710        self.maybe_compress_snapshot(&task, &mut snapshot);
711        Some(snapshot)
712    }
713
714    fn status_relaxed_task(
715        &self,
716        task_id: &str,
717        project_root: &Path,
718        storage_dir: &Path,
719    ) -> Option<Arc<BgTask>> {
720        let canonical_project = canonicalized_path(project_root);
721        match self.lookup_relaxed_task_from_db(task_id, project_root) {
722            Some(Ok(Some(metadata))) => {
723                if let Some(task) = self.task(task_id) {
724                    let matches_project = task
725                        .state
726                        .lock()
727                        .map(|state| {
728                            state
729                                .metadata
730                                .project_root
731                                .as_deref()
732                                .map(canonicalized_path)
733                                .as_deref()
734                                == Some(canonical_project.as_path())
735                        })
736                        .unwrap_or(false);
737                    return matches_project.then_some(task);
738                }
739                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
740                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
741                    return None;
742                }
743                return self.task(task_id);
744            }
745            Some(Ok(None)) => {
746                crate::slog_info!(
747                    "bash task relaxed DB miss for {}; falling back to disk",
748                    task_id
749                );
750            }
751            Some(Err(error)) => {
752                crate::slog_warn!(
753                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
754                    task_id,
755                    error
756                );
757            }
758            None => {
759                crate::slog_info!(
760                    "bash task relaxed DB unavailable for {}; falling back to disk",
761                    task_id
762                );
763            }
764        }
765        let root = storage_dir.join("bash-tasks");
766        let entries = fs::read_dir(&root).ok()?;
767        for entry in entries.flatten() {
768            let dir = entry.path();
769            if !dir.is_dir() {
770                continue;
771            }
772            let path = dir.join(format!("{task_id}.json"));
773            if !path.exists() {
774                continue;
775            }
776            let metadata = match read_task(&path) {
777                Ok(metadata) => metadata,
778                Err(error) => {
779                    crate::slog_warn!(
780                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
781                        path.display()
782                    );
783                    if let Err(quarantine_error) =
784                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
785                    {
786                        crate::slog_warn!(
787                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
788                            path.display()
789                        );
790                    }
791                    continue;
792                }
793            };
794            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
795            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
796                continue;
797            }
798            if let Some(task) = self.task(task_id) {
799                let matches_project = task
800                    .state
801                    .lock()
802                    .map(|state| {
803                        state
804                            .metadata
805                            .project_root
806                            .as_deref()
807                            .map(canonicalized_path)
808                            .as_deref()
809                            == Some(canonical_project.as_path())
810                    })
811                    .unwrap_or(false);
812                return matches_project.then_some(task);
813            }
814            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
815            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
816                return None;
817            }
818            return self.task(task_id);
819        }
820        None
821    }
822
823    fn lookup_relaxed_task_from_db(
824        &self,
825        task_id: &str,
826        project_root: &Path,
827    ) -> Option<Result<Option<PersistedTask>, String>> {
828        let pool = self
829            .inner
830            .db_pool
831            .read()
832            .ok()
833            .and_then(|slot| slot.clone())?;
834        let harness = self
835            .inner
836            .db_harness
837            .read()
838            .ok()
839            .and_then(|slot| slot.clone())?;
840        let conn = match pool.lock() {
841            Ok(conn) => conn,
842            Err(_) => return Some(Err("db mutex poisoned".to_string())),
843        };
844        let project_key = crate::search_index::project_cache_key(project_root);
845        Some(
846            crate::db::bash_tasks::find_bash_task_for_project(
847                &conn,
848                &harness,
849                &project_key,
850                task_id,
851            )
852            .map(|row| row.map(PersistedTask::from))
853            .map_err(|error| error.to_string()),
854        )
855    }
856
857    pub(super) fn status_relaxed(
858        &self,
859        task_id: &str,
860        _session_id: &str,
861        project_root: &Path,
862        storage_dir: &Path,
863        preview_bytes: usize,
864    ) -> Option<BgTaskSnapshot> {
865        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
866        let _ = self.poll_task(&task);
867        let mut snapshot = task.snapshot(preview_bytes);
868        self.maybe_compress_snapshot(&task, &mut snapshot);
869        Some(snapshot)
870    }
871
872    pub fn kill_relaxed(
873        &self,
874        task_id: &str,
875        project_root: &Path,
876        storage_dir: &Path,
877    ) -> Result<BgTaskSnapshot, String> {
878        let task = self
879            .status_relaxed_task(task_id, project_root, storage_dir)
880            .ok_or_else(|| format!("background task not found: {task_id}"))?;
881        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
882    }
883
884    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
885        #[cfg(test)]
886        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
887
888        let mut deleted = 0usize;
889
890        let root = storage_dir.join("bash-tasks");
891        if root.exists() {
892            let session_dirs = fs::read_dir(&root).map_err(|e| {
893                format!(
894                    "failed to read background task root {}: {e}",
895                    root.display()
896                )
897            })?;
898            for session_entry in session_dirs.flatten() {
899                let session_dir = session_entry.path();
900                if !session_dir.is_dir() {
901                    continue;
902                }
903                let task_entries = match fs::read_dir(&session_dir) {
904                    Ok(entries) => entries,
905                    Err(error) => {
906                        crate::slog_warn!(
907                            "failed to read background task session dir {}: {error}",
908                            session_dir.display()
909                        );
910                        continue;
911                    }
912                };
913                for task_entry in task_entries.flatten() {
914                    let json_path = task_entry.path();
915                    if json_path
916                        .extension()
917                        .and_then(|extension| extension.to_str())
918                        != Some("json")
919                    {
920                        continue;
921                    }
922                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
923                        continue;
924                    }
925                    let metadata = match read_task(&json_path) {
926                        Ok(metadata) => metadata,
927                        Err(error) => {
928                            crate::slog_warn!(
929                                "quarantining corrupt background task metadata {}: {error}",
930                                json_path.display()
931                            );
932                            quarantine_task_json(
933                                storage_dir,
934                                &session_dir,
935                                &json_path,
936                                QuarantineKind::Corrupt,
937                            )?;
938                            continue;
939                        }
940                    };
941                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
942                        continue;
943                    }
944                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
945                    match delete_task_bundle(&paths) {
946                        Ok(()) => {
947                            deleted += 1;
948                            log::debug!(
949                                "deleted persisted background task bundle {}",
950                                metadata.task_id
951                            );
952                        }
953                        Err(error) => {
954                            crate::slog_warn!(
955                                "failed to delete background task bundle {}: {error}",
956                                metadata.task_id
957                            );
958                            continue;
959                        }
960                    }
961                }
962            }
963        }
964        gc_quarantine(storage_dir);
965        Ok(deleted)
966    }
967
968    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
969        let tasks = self
970            .inner
971            .tasks
972            .lock()
973            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
974            .unwrap_or_default();
975        tasks
976            .into_iter()
977            .map(|task| {
978                let _ = self.poll_task(&task);
979                let mut snapshot = task.snapshot(preview_bytes);
980                self.maybe_compress_snapshot(&task, &mut snapshot);
981                snapshot
982            })
983            .collect()
984    }
985
986    /// Compress `output_preview` in place when the task is in a terminal
987    /// state. Live tail of running tasks stays raw so agents debugging
988    /// long-running bash see exactly what the process emitted, not a
989    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
990    /// field on `PersistedTask` short-circuits before the compress pipeline.
991    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
992        if !snapshot.info.status.is_terminal() {
993            return;
994        }
995        let compressed_flag = task
996            .state
997            .lock()
998            .map(|state| state.metadata.compressed)
999            .unwrap_or(true);
1000        if !compressed_flag {
1001            return;
1002        }
1003        let raw = std::mem::take(&mut snapshot.output_preview);
1004        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1005    }
1006
1007    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1008        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1009    }
1010
1011    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1012        let task = self
1013            .task_for_session(task_id, session_id)
1014            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1015        let mut state = task
1016            .state
1017            .lock()
1018            .map_err(|_| "background task lock poisoned".to_string())?;
1019        let updated = self
1020            .update_task_metadata(&task.paths, |metadata| {
1021                metadata.notify_on_completion = true;
1022                metadata.completion_delivered = false;
1023            })
1024            .map_err(|e| format!("failed to promote background task: {e}"))?;
1025        state.metadata = updated;
1026        if state.metadata.status.is_terminal() {
1027            state.buffer.enforce_terminal_cap();
1028            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1029        }
1030        Ok(true)
1031    }
1032
1033    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1034        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1035            .map(|_| ())
1036    }
1037
1038    pub fn cleanup_finished(&self, older_than: Duration) {
1039        let cutoff = Instant::now().checked_sub(older_than);
1040        let removable_paths: Vec<(String, TaskPaths)> =
1041            if let Ok(mut tasks) = self.inner.tasks.lock() {
1042                let removable = tasks
1043                    .iter()
1044                    .filter_map(|(task_id, task)| {
1045                        let delivered_terminal = task
1046                            .state
1047                            .lock()
1048                            .map(|state| {
1049                                state.metadata.status.is_terminal()
1050                                    && state.metadata.completion_delivered
1051                            })
1052                            .unwrap_or(false);
1053                        if !delivered_terminal {
1054                            return None;
1055                        }
1056
1057                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1058                        let expired = match (terminal_at, cutoff) {
1059                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1060                            (Some(_), None) => true,
1061                            (None, _) => false,
1062                        };
1063                        expired.then(|| task_id.clone())
1064                    })
1065                    .collect::<Vec<_>>();
1066
1067                removable
1068                    .into_iter()
1069                    .filter_map(|task_id| {
1070                        tasks
1071                            .remove(&task_id)
1072                            .map(|task| (task_id, task.paths.clone()))
1073                    })
1074                    .collect()
1075            } else {
1076                Vec::new()
1077            };
1078
1079        for (task_id, paths) in removable_paths {
1080            match delete_task_bundle(&paths) {
1081                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1082                Err(error) => crate::slog_warn!(
1083                    "failed to delete persisted background task bundle {task_id}: {error}"
1084                ),
1085            }
1086        }
1087    }
1088
1089    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1090        self.drain_completions_for_session(None)
1091    }
1092
1093    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1094        let completions = match self.inner.completions.lock() {
1095            Ok(completions) => completions,
1096            Err(_) => return Vec::new(),
1097        };
1098
1099        completions
1100            .iter()
1101            .filter(|completion| {
1102                session_id
1103                    .map(|session_id| completion.session_id == session_id)
1104                    .unwrap_or(true)
1105            })
1106            .cloned()
1107            .collect()
1108    }
1109
1110    pub fn ack_completions_for_session(
1111        &self,
1112        session_id: Option<&str>,
1113        task_ids: &[String],
1114    ) -> Vec<String> {
1115        if task_ids.is_empty() {
1116            return Vec::new();
1117        }
1118        let task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1119        let mut completions = match self.inner.completions.lock() {
1120            Ok(completions) => completions,
1121            Err(_) => return Vec::new(),
1122        };
1123        let mut acked = Vec::new();
1124        completions.retain(|completion| {
1125            let session_matches = session_id
1126                .map(|session_id| completion.session_id == session_id)
1127                .unwrap_or(true);
1128            if session_matches && task_ids.contains(completion.task_id.as_str()) {
1129                acked.push((completion.task_id.clone(), completion.session_id.clone()));
1130                false
1131            } else {
1132                true
1133            }
1134        });
1135        drop(completions);
1136
1137        let mut delivered = Vec::new();
1138        for (task_id, completion_session_id) in acked {
1139            if let Some(task) = self.task_for_session(&task_id, &completion_session_id) {
1140                if task.set_completion_delivered(true, self).is_ok() {
1141                    delivered.push(task_id);
1142                }
1143            }
1144        }
1145
1146        delivered
1147    }
1148
1149    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1150        self.inner
1151            .completions
1152            .lock()
1153            .map(|completions| {
1154                completions
1155                    .iter()
1156                    .filter(|completion| completion.session_id == session_id)
1157                    .cloned()
1158                    .collect()
1159            })
1160            .unwrap_or_default()
1161    }
1162
1163    pub fn detach(&self) {
1164        self.inner.shutdown.store(true, Ordering::SeqCst);
1165        if let Ok(mut tasks) = self.inner.tasks.lock() {
1166            for task in tasks.values() {
1167                if let Ok(mut state) = task.state.lock() {
1168                    state.child = None;
1169                    state.detached = true;
1170                }
1171            }
1172            tasks.clear();
1173        }
1174    }
1175
1176    pub fn shutdown(&self) {
1177        let tasks = self
1178            .inner
1179            .tasks
1180            .lock()
1181            .map(|tasks| {
1182                tasks
1183                    .values()
1184                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1185                    .collect::<Vec<_>>()
1186            })
1187            .unwrap_or_default();
1188        for (task_id, session_id) in tasks {
1189            let _ = self.kill(&task_id, &session_id);
1190        }
1191    }
1192
1193    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1194        let marker = match read_exit_marker(&task.paths.exit) {
1195            Ok(Some(marker)) => marker,
1196            Ok(None) => return Ok(()),
1197            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1198        };
1199        self.finalize_from_marker(task, marker, None)
1200    }
1201
1202    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1203        let Ok(mut state) = task.state.lock() else {
1204            return;
1205        };
1206        if let Some(child) = state.child.as_mut() {
1207            if matches!(child.try_wait(), Ok(Some(_))) {
1208                // Child has exited. If the wrapper successfully wrote an
1209                // exit marker, the next `poll_task()` cycle will pick it up
1210                // and finalize via `finalize_from_marker`. But if the
1211                // wrapper crashed before writing the marker (e.g. SIGKILL,
1212                // power loss, wrapper bug), the task would forever appear
1213                // Running until `timeout_ms` expired — and if no timeout
1214                // was set, until the 24h `running_metadata_is_stale` cutoff
1215                // hit at the next aft restart. Same condition as the replay
1216                // path's "PID dead but no marker" branch (see line 338).
1217                //
1218                // To avoid that hidden hang, mark the task Failed
1219                // immediately with the same reason string used by replay,
1220                // but only if the marker is genuinely absent. If a marker
1221                // appeared on disk between try_wait() returning and now
1222                // (race window), prefer the marker — let the next poll
1223                // cycle finalize from it.
1224                state.child = None;
1225                state.detached = true;
1226                self.fail_without_exit_marker_if_needed(task, &mut state);
1227            }
1228        } else if state.detached
1229            && state
1230                .metadata
1231                .child_pid
1232                .is_some_and(|pid| !is_process_alive(pid))
1233        {
1234            self.fail_without_exit_marker_if_needed(task, &mut state);
1235        }
1236    }
1237
1238    fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1239        if state.metadata.status.is_terminal() {
1240            return;
1241        }
1242        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1243            return;
1244        }
1245        let updated = self.update_task_metadata(&task.paths, |metadata| {
1246            metadata.mark_terminal(
1247                BgTaskStatus::Failed,
1248                None,
1249                Some("process exited without exit marker".to_string()),
1250            );
1251        });
1252        if let Ok(metadata) = updated {
1253            state.metadata = metadata;
1254            task.mark_terminal_now();
1255            state.buffer.enforce_terminal_cap();
1256            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1257        }
1258    }
1259
1260    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1261        self.inner
1262            .tasks
1263            .lock()
1264            .map(|tasks| {
1265                tasks
1266                    .values()
1267                    .filter(|task| task.is_running())
1268                    .cloned()
1269                    .collect()
1270            })
1271            .unwrap_or_default()
1272    }
1273
1274    fn insert_rehydrated_task(
1275        &self,
1276        metadata: PersistedTask,
1277        paths: TaskPaths,
1278        detached: bool,
1279    ) -> Result<(), String> {
1280        let task_id = metadata.task_id.clone();
1281        let session_id = metadata.session_id.clone();
1282        let started = started_instant_from_unix_millis(metadata.started_at);
1283        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1284        let task = Arc::new(BgTask {
1285            task_id: task_id.clone(),
1286            session_id,
1287            paths: paths.clone(),
1288            started,
1289            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1290            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1291            state: Mutex::new(BgTaskState {
1292                metadata,
1293                child: None,
1294                detached,
1295                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
1296            }),
1297        });
1298        self.inner
1299            .tasks
1300            .lock()
1301            .map_err(|_| "background task registry lock poisoned".to_string())?
1302            .insert(task_id, task);
1303        Ok(())
1304    }
1305
1306    fn kill_with_status(
1307        &self,
1308        task_id: &str,
1309        session_id: &str,
1310        terminal_status: BgTaskStatus,
1311    ) -> Result<BgTaskSnapshot, String> {
1312        let task = self
1313            .task_for_session(task_id, session_id)
1314            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1315
1316        {
1317            let mut state = task
1318                .state
1319                .lock()
1320                .map_err(|_| "background task lock poisoned".to_string())?;
1321            if state.metadata.status.is_terminal() {
1322                return Ok(task.snapshot_locked(&state, 5 * 1024));
1323            }
1324
1325            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1326                state.metadata =
1327                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1328                task.mark_terminal_now();
1329                state.child = None;
1330                state.detached = true;
1331                state.buffer.enforce_terminal_cap();
1332                self.persist_task(&task.paths, &state.metadata)
1333                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1334                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1335                return Ok(task.snapshot_locked(&state, 5 * 1024));
1336            }
1337
1338            state.metadata.status = BgTaskStatus::Killing;
1339            self.persist_task(&task.paths, &state.metadata)
1340                .map_err(|e| format!("failed to persist killing state: {e}"))?;
1341
1342            #[cfg(unix)]
1343            if let Some(pgid) = state.metadata.pgid {
1344                terminate_pgid(pgid, state.child.as_mut());
1345            }
1346            #[cfg(windows)]
1347            if let Some(child) = state.child.as_mut() {
1348                super::process::terminate_process(child);
1349            } else if let Some(pid) = state.metadata.child_pid {
1350                terminate_pid(pid);
1351            }
1352            if let Some(child) = state.child.as_mut() {
1353                let _ = child.wait();
1354            }
1355            state.child = None;
1356            state.detached = true;
1357
1358            if !task.paths.exit.exists() {
1359                write_kill_marker_if_absent(&task.paths.exit)
1360                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
1361            }
1362
1363            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1364                Some(124)
1365            } else {
1366                None
1367            };
1368            state
1369                .metadata
1370                .mark_terminal(terminal_status, exit_code, None);
1371            task.mark_terminal_now();
1372            self.persist_task(&task.paths, &state.metadata)
1373                .map_err(|e| format!("failed to persist killed state: {e}"))?;
1374            state.buffer.enforce_terminal_cap();
1375            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1376        }
1377
1378        Ok(task.snapshot(5 * 1024))
1379    }
1380
1381    fn finalize_from_marker(
1382        &self,
1383        task: &Arc<BgTask>,
1384        marker: ExitMarker,
1385        reason: Option<String>,
1386    ) -> Result<(), String> {
1387        let mut state = task
1388            .state
1389            .lock()
1390            .map_err(|_| "background task lock poisoned".to_string())?;
1391        if state.metadata.status.is_terminal() {
1392            return Ok(());
1393        }
1394
1395        let updated = self
1396            .update_task_metadata(&task.paths, |metadata| {
1397                let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1398                *metadata = new_metadata;
1399            })
1400            .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1401        state.metadata = updated;
1402        task.mark_terminal_now();
1403        state.child = None;
1404        state.detached = true;
1405        state.buffer.enforce_terminal_cap();
1406        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1407        Ok(())
1408    }
1409
1410    fn enqueue_completion_if_needed(
1411        &self,
1412        metadata: &PersistedTask,
1413        paths: Option<&TaskPaths>,
1414        emit_frame: bool,
1415    ) {
1416        if metadata.status.is_terminal() && !metadata.completion_delivered {
1417            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1418        }
1419    }
1420
1421    fn enqueue_completion_locked(
1422        &self,
1423        metadata: &PersistedTask,
1424        buffer: Option<&BgBuffer>,
1425        emit_frame: bool,
1426    ) {
1427        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1428    }
1429
1430    fn enqueue_completion_from_parts(
1431        &self,
1432        metadata: &PersistedTask,
1433        buffer: Option<&BgBuffer>,
1434        paths: Option<&TaskPaths>,
1435        emit_frame: bool,
1436    ) {
1437        // Only the terminal-state guard prevents double-recording here. The
1438        // `completion_delivered` flag is NOT used to gate compression-event
1439        // recording, because `mark_terminal` flips `completion_delivered=true`
1440        // immediately for tasks with `notify_on_completion=false` (foreground
1441        // bash polled via `bash_status`, which is the common case). Pre-emptive
1442        // delivery flagging is correct for the push-frame queue (suppresses
1443        // duplicate user-visible notifications) but would silently skip the
1444        // database insert below. Compression event recording is idempotent at
1445        // the DB layer (unique on harness+session+task_id), so re-entry is
1446        // safe; the dedupe-by-queue check stays for the push frame side.
1447        if !metadata.status.is_terminal() {
1448            return;
1449        }
1450        // Read tail once at completion time and cache on the BgCompletion so
1451        // both the push-frame consumer (running session) and any later
1452        // `bash_drain_completions` poll (different session, restart) see the
1453        // same preview without racing against rotation.
1454        let (raw_preview, output_truncated) = match buffer {
1455            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1456            None => paths
1457                .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1458                .unwrap_or_else(|| (String::new(), false)),
1459        };
1460        // Compress at completion time so push-frame consumers and later
1461        // `bash_drain_completions` poll-callers see the same compressed text.
1462        // Per-task `compressed: false` opts out; otherwise the compressor is
1463        // a no-op when `experimental.bash.compress=false`.
1464        let output_preview = if metadata.compressed {
1465            self.compress_output(&metadata.command, raw_preview)
1466        } else {
1467            raw_preview
1468        };
1469        let token_counts = self.completion_token_counts(metadata, buffer, paths);
1470        let completion = BgCompletion {
1471            task_id: metadata.task_id.clone(),
1472            session_id: metadata.session_id.clone(),
1473            status: metadata.status.clone(),
1474            exit_code: metadata.exit_code,
1475            command: metadata.command.clone(),
1476            output_preview,
1477            output_truncated,
1478            original_tokens: token_counts.original_tokens,
1479            compressed_tokens: token_counts.compressed_tokens,
1480            tokens_skipped: token_counts.tokens_skipped,
1481        };
1482
1483        // Record the compression event BEFORE the push-frame dedupe. Event
1484        // recording has its own idempotency at the DB layer (unique key on
1485        // harness+session+task_id), so it's safe to attempt for every
1486        // terminal-state finalize. Critically, this path runs even when
1487        // `completion_delivered=true` was pre-set by `mark_terminal` for
1488        // foreground bash (`notify_on_completion=false`) — which is the common
1489        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
1490        // after the dedupe guard and never fired for foreground tasks, which
1491        // meant compression accounting was effectively dead for >99% of
1492        // real-world bash usage.
1493        self.record_compression_event_if_applicable(metadata, &token_counts);
1494
1495        // Push-frame queue is gated on `completion_delivered` so foreground
1496        // bash with `notify_on_completion=false` does not leak a user-visible
1497        // completion notification. `mark_terminal` pre-sets
1498        // `completion_delivered=true` for those tasks; honoring it here keeps
1499        // the suppression invariant the test
1500        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
1501        // asserts. The compression-event recording above intentionally runs
1502        // before this gate so foreground bash still contributes to the
1503        // session/project aggregates.
1504        if metadata.completion_delivered {
1505            return;
1506        }
1507
1508        // Push-frame queue dedupe stays per-task to prevent duplicate
1509        // user-visible completion notifications.
1510        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
1511            if completions
1512                .iter()
1513                .any(|existing| existing.task_id == metadata.task_id)
1514            {
1515                false
1516            } else {
1517                completions.push_back(completion.clone());
1518                true
1519            }
1520        } else {
1521            false
1522        };
1523
1524        if pushed && emit_frame {
1525            self.emit_bash_completed(completion);
1526        }
1527    }
1528
1529    fn record_compression_event_if_applicable(
1530        &self,
1531        metadata: &PersistedTask,
1532        token_counts: &CompletionTokenCounts,
1533    ) {
1534        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
1535            token_counts.original_tokens,
1536            token_counts.compressed_tokens,
1537            token_counts.original_bytes,
1538            token_counts.compressed_bytes,
1539        ) {
1540            (
1541                Some(original_tokens),
1542                Some(compressed_tokens),
1543                Some(original_bytes),
1544                Some(compressed_bytes),
1545            ) => (
1546                original_tokens,
1547                compressed_tokens,
1548                original_bytes,
1549                compressed_bytes,
1550            ),
1551            _ => {
1552                crate::slog_warn!(
1553                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
1554                    metadata.task_id
1555                );
1556                return;
1557            }
1558        };
1559
1560        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
1561        let Some(pool) = pool else {
1562            crate::slog_warn!(
1563                "compression event skipped for {}: db_pool not initialized — was configure run?",
1564                metadata.task_id
1565            );
1566            return;
1567        };
1568        let harness = self
1569            .inner
1570            .db_harness
1571            .read()
1572            .ok()
1573            .and_then(|slot| slot.clone());
1574        let Some(harness) = harness else {
1575            crate::slog_warn!(
1576                "compression event insert skipped for {}: harness not configured",
1577                metadata.task_id
1578            );
1579            return;
1580        };
1581
1582        let project_root = metadata
1583            .project_root
1584            .as_deref()
1585            .unwrap_or(&metadata.workdir);
1586        let project_key = crate::search_index::project_cache_key(project_root);
1587        let row = crate::db::compression_events::CompressionEventRow {
1588            harness: &harness,
1589            session_id: Some(&metadata.session_id),
1590            project_key: &project_key,
1591            tool: "bash",
1592            task_id: Some(&metadata.task_id),
1593            command: Some(&metadata.command),
1594            compressor: if metadata.compressed {
1595                "registry"
1596            } else {
1597                "none"
1598            },
1599            original_bytes,
1600            compressed_bytes,
1601            original_tokens,
1602            compressed_tokens,
1603            created_at: unix_millis() as i64,
1604        };
1605
1606        let conn = match pool.lock() {
1607            Ok(conn) => conn,
1608            Err(_) => {
1609                crate::slog_warn!(
1610                    "compression event insert failed for {}: db mutex poisoned",
1611                    metadata.task_id
1612                );
1613                return;
1614            }
1615        };
1616        match crate::db::compression_events::insert_compression_event(&conn, &row) {
1617            Ok(_) => {
1618                crate::slog_info!(
1619                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
1620                    metadata.task_id,
1621                    project_key,
1622                    metadata.session_id,
1623                    original_tokens,
1624                    compressed_tokens
1625                );
1626            }
1627            Err(error) => {
1628                crate::slog_warn!(
1629                    "compression event insert failed for {}: {}",
1630                    metadata.task_id,
1631                    error
1632                );
1633            }
1634        }
1635    }
1636
1637    fn emit_bash_completed(&self, completion: BgCompletion) {
1638        let Ok(progress_sender) = self
1639            .inner
1640            .progress_sender
1641            .lock()
1642            .map(|sender| sender.clone())
1643        else {
1644            return;
1645        };
1646        let Some(sender) = progress_sender.as_ref() else {
1647            return;
1648        };
1649        // Clone the callback out of the registry mutex before writing to stdout;
1650        // otherwise a blocked push-frame write could pin the mutex and starve
1651        // unrelated progress-sender updates.
1652        // Bg task transitions are discovered by the watchdog thread, so the
1653        // sender is shared behind a Mutex. It still uses the same stdout writer
1654        // closure as foreground progress frames, preserving the existing lock/
1655        // flush behavior in main.rs.
1656        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1657            completion.task_id,
1658            completion.session_id,
1659            completion.status,
1660            completion.exit_code,
1661            completion.command,
1662            completion.output_preview,
1663            completion.output_truncated,
1664            completion.original_tokens,
1665            completion.compressed_tokens,
1666            completion.tokens_skipped,
1667        )));
1668    }
1669
1670    fn completion_token_counts(
1671        &self,
1672        metadata: &PersistedTask,
1673        buffer: Option<&BgBuffer>,
1674        paths: Option<&TaskPaths>,
1675    ) -> CompletionTokenCounts {
1676        let raw = match buffer {
1677            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
1678            None => paths
1679                .map(|paths| read_for_token_count_from_disk(paths, TOKENIZE_CAP_BYTES_PER_STREAM))
1680                .unwrap_or(TokenCountInput::Skipped),
1681        };
1682
1683        let TokenCountInput::Text(raw_output) = raw else {
1684            return CompletionTokenCounts::skipped();
1685        };
1686
1687        let original_tokens = token_count_u32(&raw_output);
1688        let original_bytes = raw_output.len() as i64;
1689        let compressed_output = if metadata.compressed {
1690            self.compress_output(&metadata.command, raw_output)
1691        } else {
1692            raw_output
1693        };
1694        let compressed_tokens = token_count_u32(&compressed_output);
1695        let compressed_bytes = compressed_output.len() as i64;
1696        CompletionTokenCounts {
1697            original_tokens: Some(original_tokens),
1698            compressed_tokens: Some(compressed_tokens),
1699            original_bytes: Some(original_bytes),
1700            compressed_bytes: Some(compressed_bytes),
1701            tokens_skipped: false,
1702        }
1703    }
1704
1705    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1706        if !self
1707            .inner
1708            .long_running_reminder_enabled
1709            .load(Ordering::SeqCst)
1710        {
1711            return;
1712        }
1713        let interval_ms = self
1714            .inner
1715            .long_running_reminder_interval_ms
1716            .load(Ordering::SeqCst);
1717        if interval_ms == 0 {
1718            return;
1719        }
1720        let interval = Duration::from_millis(interval_ms);
1721        let now = Instant::now();
1722        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1723            return;
1724        };
1725        let since = last_reminder_at.unwrap_or(task.started);
1726        if now.duration_since(since) < interval {
1727            return;
1728        }
1729        let command = task
1730            .state
1731            .lock()
1732            .map(|state| state.metadata.command.clone())
1733            .unwrap_or_default();
1734        *last_reminder_at = Some(now);
1735        self.emit_bash_long_running(BashLongRunningFrame::new(
1736            task.task_id.clone(),
1737            task.session_id.clone(),
1738            command,
1739            task.started.elapsed().as_millis() as u64,
1740        ));
1741    }
1742
1743    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1744        let Ok(progress_sender) = self
1745            .inner
1746            .progress_sender
1747            .lock()
1748            .map(|sender| sender.clone())
1749        else {
1750            return;
1751        };
1752        if let Some(sender) = progress_sender.as_ref() {
1753            sender(PushFrame::BashLongRunning(frame));
1754        }
1755    }
1756
1757    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1758        self.inner
1759            .tasks
1760            .lock()
1761            .ok()
1762            .and_then(|tasks| tasks.get(task_id).cloned())
1763    }
1764
1765    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1766        self.task(task_id)
1767            .filter(|task| task.session_id == session_id)
1768    }
1769
1770    fn running_count(&self) -> usize {
1771        self.inner
1772            .tasks
1773            .lock()
1774            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1775            .unwrap_or(0)
1776    }
1777
1778    fn start_watchdog(&self) {
1779        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1780            super::watchdog::start(self.clone());
1781        }
1782    }
1783
1784    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1785        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1786    }
1787
1788    #[cfg(test)]
1789    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1790        self.task_for_session(task_id, session_id)
1791            .map(|task| task.paths.json.clone())
1792    }
1793
1794    #[cfg(test)]
1795    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1796        self.task_for_session(task_id, session_id)
1797            .map(|task| task.paths.exit.clone())
1798    }
1799
1800    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
1801    fn generate_unique_task_id(&self) -> Result<String, String> {
1802        for _ in 0..32 {
1803            let candidate = random_slug();
1804            let tasks = self
1805                .inner
1806                .tasks
1807                .lock()
1808                .map_err(|_| "background task registry lock poisoned".to_string())?;
1809            if tasks.contains_key(&candidate) {
1810                continue;
1811            }
1812            let completions = self
1813                .inner
1814                .completions
1815                .lock()
1816                .map_err(|_| "background completions lock poisoned".to_string())?;
1817            if completions
1818                .iter()
1819                .any(|completion| completion.task_id == candidate)
1820            {
1821                continue;
1822            }
1823            return Ok(candidate);
1824        }
1825        Err("failed to allocate unique background task id after 32 attempts".to_string())
1826    }
1827}
1828
1829struct CompletionTokenCounts {
1830    original_tokens: Option<u32>,
1831    compressed_tokens: Option<u32>,
1832    original_bytes: Option<i64>,
1833    compressed_bytes: Option<i64>,
1834    tokens_skipped: bool,
1835}
1836
1837impl CompletionTokenCounts {
1838    fn skipped() -> Self {
1839        Self {
1840            original_tokens: None,
1841            compressed_tokens: None,
1842            original_bytes: None,
1843            compressed_bytes: None,
1844            tokens_skipped: true,
1845        }
1846    }
1847}
1848
1849fn token_count_u32(text: &str) -> u32 {
1850    aft_tokenizer::count_tokens(text)
1851        .try_into()
1852        .unwrap_or(u32::MAX)
1853}
1854
1855impl Default for BgTaskRegistry {
1856    fn default() -> Self {
1857        Self::new(Arc::new(Mutex::new(None)))
1858    }
1859}
1860
1861fn modified_within(path: &Path, grace: Duration) -> bool {
1862    fs::metadata(path)
1863        .and_then(|metadata| metadata.modified())
1864        .ok()
1865        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1866        .map(|age| age < grace)
1867        .unwrap_or(false)
1868}
1869
1870fn canonicalized_path(path: &Path) -> PathBuf {
1871    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1872}
1873
1874fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1875    let now_ms = SystemTime::now()
1876        .duration_since(UNIX_EPOCH)
1877        .ok()
1878        .map(|duration| duration.as_millis() as u64)
1879        .unwrap_or(started_at);
1880    let elapsed_ms = now_ms.saturating_sub(started_at);
1881    Instant::now()
1882        .checked_sub(Duration::from_millis(elapsed_ms))
1883        .unwrap_or_else(Instant::now)
1884}
1885
1886fn gc_quarantine(storage_dir: &Path) {
1887    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1888    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1889        return;
1890    };
1891    for session_entry in session_dirs.flatten() {
1892        let session_quarantine_dir = session_entry.path();
1893        if !session_quarantine_dir.is_dir() {
1894            continue;
1895        }
1896        let entries = match fs::read_dir(&session_quarantine_dir) {
1897            Ok(entries) => entries,
1898            Err(error) => {
1899                crate::slog_warn!(
1900                    "failed to read background task quarantine dir {}: {error}",
1901                    session_quarantine_dir.display()
1902                );
1903                continue;
1904            }
1905        };
1906        for entry in entries.flatten() {
1907            let path = entry.path();
1908            if modified_within(&path, QUARANTINE_GC_GRACE) {
1909                continue;
1910            }
1911            let result = if path.is_dir() {
1912                fs::remove_dir_all(&path)
1913            } else {
1914                fs::remove_file(&path)
1915            };
1916            match result {
1917                Ok(()) => log::debug!(
1918                    "deleted old background task quarantine entry {}",
1919                    path.display()
1920                ),
1921                Err(error) => crate::slog_warn!(
1922                    "failed to delete old background task quarantine entry {}: {error}",
1923                    path.display()
1924                ),
1925            }
1926        }
1927        let _ = fs::remove_dir(&session_quarantine_dir);
1928    }
1929    let _ = fs::remove_dir(&quarantine_root);
1930}
1931
1932enum QuarantineKind {
1933    Corrupt,
1934    Invalid,
1935}
1936
1937fn quarantine_task_json(
1938    storage_dir: &Path,
1939    session_dir: &Path,
1940    json_path: &Path,
1941    kind: QuarantineKind,
1942) -> Result<(), String> {
1943    let session_hash = session_dir
1944        .file_name()
1945        .and_then(|name| name.to_str())
1946        .ok_or_else(|| {
1947            format!(
1948                "invalid background task session dir: {}",
1949                session_dir.display()
1950            )
1951        })?;
1952    let task_name = json_path
1953        .file_name()
1954        .and_then(|name| name.to_str())
1955        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1956    let unix_ts = SystemTime::now()
1957        .duration_since(UNIX_EPOCH)
1958        .map(|duration| duration.as_secs())
1959        .unwrap_or(0);
1960    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1961    fs::create_dir_all(&quarantine_dir).map_err(|e| {
1962        format!(
1963            "failed to create background task quarantine dir {}: {e}",
1964            quarantine_dir.display()
1965        )
1966    })?;
1967    let target_name = quarantine_name(task_name, unix_ts, &kind);
1968    let target = quarantine_dir.join(target_name);
1969    fs::rename(json_path, &target).map_err(|e| {
1970        format!(
1971            "failed to quarantine background task metadata {} to {}: {e}",
1972            json_path.display(),
1973            target.display()
1974        )
1975    })?;
1976
1977    for sibling in task_sibling_paths(json_path) {
1978        if !sibling.exists() {
1979            continue;
1980        }
1981        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1982            crate::slog_warn!(
1983                "skipping background task sibling with invalid name during quarantine: {}",
1984                sibling.display()
1985            );
1986            continue;
1987        };
1988        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
1989        if let Err(error) = fs::rename(&sibling, &sibling_target) {
1990            crate::slog_warn!(
1991                "failed to quarantine background task sibling {} to {}: {error}",
1992                sibling.display(),
1993                sibling_target.display()
1994            );
1995        }
1996    }
1997
1998    let _ = fs::remove_dir(session_dir);
1999    Ok(())
2000}
2001
2002fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2003    match kind {
2004        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2005        QuarantineKind::Invalid => {
2006            let path = Path::new(file_name);
2007            let stem = path.file_stem().and_then(|stem| stem.to_str());
2008            let extension = path.extension().and_then(|extension| extension.to_str());
2009            match (stem, extension) {
2010                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2011                _ => format!("{file_name}.invalid.{unix_ts}"),
2012            }
2013        }
2014    }
2015}
2016
2017fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2018    let Some(parent) = json_path.parent() else {
2019        return Vec::new();
2020    };
2021    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2022        return Vec::new();
2023    };
2024    ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
2025        .into_iter()
2026        .map(|extension| parent.join(format!("{stem}.{extension}")))
2027        .collect()
2028}
2029
2030fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
2031    let stdout = fs::read(&paths.stdout).unwrap_or_default();
2032    let stderr = fs::read(&paths.stderr).unwrap_or_default();
2033    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2034    bytes.extend_from_slice(&stdout);
2035    bytes.extend_from_slice(&stderr);
2036    if bytes.len() <= max_bytes {
2037        return (String::from_utf8_lossy(&bytes).into_owned(), false);
2038    }
2039    let start = bytes.len().saturating_sub(max_bytes);
2040    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2041}
2042
2043fn read_for_token_count_from_disk(
2044    paths: &TaskPaths,
2045    max_bytes_per_stream: usize,
2046) -> TokenCountInput {
2047    // Read up to `max_bytes_per_stream` bytes per stream rather than
2048    // refusing to tokenize anything when the file exceeds the cap.
2049    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
2050    // (see comment there) — large outputs are exactly the tasks that
2051    // benefit most from compression accounting, so silent-skipping
2052    // them defeats the purpose of token tracking.
2053    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2054    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2055    match (stdout, stderr) {
2056        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2057            String::from_utf8_lossy(&stdout).as_ref(),
2058            String::from_utf8_lossy(&stderr).as_ref(),
2059        )),
2060        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2061            String::from_utf8_lossy(&stdout).as_ref(),
2062            "",
2063        )),
2064        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2065            "",
2066            String::from_utf8_lossy(&stderr).as_ref(),
2067        )),
2068        (Err(_), Err(_)) => TokenCountInput::Skipped,
2069    }
2070}
2071
2072/// Read at most `max_bytes` bytes from the END of `path`. Used for
2073/// tokenization where the most recent output is more representative than
2074/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
2075/// opened (genuinely missing or permissions error).
2076fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2077    use std::io::{Read, Seek, SeekFrom};
2078    let mut file = std::fs::File::open(path)?;
2079    let len = file.metadata()?.len();
2080    let read_len = len.min(max_bytes as u64);
2081    if read_len > 0 && len > max_bytes as u64 {
2082        file.seek(SeekFrom::End(-(read_len as i64)))?;
2083    }
2084    let mut bytes = Vec::with_capacity(read_len as usize);
2085    file.read_to_end(&mut bytes)?;
2086    Ok(bytes)
2087}
2088
2089impl BgTask {
2090    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2091        let state = self
2092            .state
2093            .lock()
2094            .unwrap_or_else(|poison| poison.into_inner());
2095        self.snapshot_locked(&state, preview_bytes)
2096    }
2097
2098    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2099        let metadata = &state.metadata;
2100        let duration_ms = metadata.duration_ms.or_else(|| {
2101            metadata
2102                .status
2103                .is_terminal()
2104                .then(|| self.started.elapsed().as_millis() as u64)
2105        });
2106        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
2107        BgTaskSnapshot {
2108            info: BgTaskInfo {
2109                task_id: self.task_id.clone(),
2110                status: metadata.status.clone(),
2111                command: metadata.command.clone(),
2112                started_at: metadata.started_at,
2113                duration_ms,
2114            },
2115            exit_code: metadata.exit_code,
2116            child_pid: metadata.child_pid,
2117            workdir: metadata.workdir.display().to_string(),
2118            output_preview,
2119            output_truncated,
2120            output_path: state
2121                .buffer
2122                .output_path()
2123                .map(|path| path.display().to_string()),
2124            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
2125        }
2126    }
2127
2128    pub(crate) fn is_running(&self) -> bool {
2129        self.state
2130            .lock()
2131            .map(|state| state.metadata.status == BgTaskStatus::Running)
2132            .unwrap_or(false)
2133    }
2134
2135    fn mark_terminal_now(&self) {
2136        if let Ok(mut terminal_at) = self.terminal_at.lock() {
2137            if terminal_at.is_none() {
2138                *terminal_at = Some(Instant::now());
2139            }
2140        }
2141    }
2142
2143    fn set_completion_delivered(
2144        &self,
2145        delivered: bool,
2146        registry: &BgTaskRegistry,
2147    ) -> Result<(), String> {
2148        let mut state = self
2149            .state
2150            .lock()
2151            .map_err(|_| "background task lock poisoned".to_string())?;
2152        let updated = registry
2153            .update_task_metadata(&self.paths, |metadata| {
2154                metadata.completion_delivered = delivered;
2155            })
2156            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2157        state.metadata = updated;
2158        Ok(())
2159    }
2160}
2161
2162fn terminal_metadata_from_marker(
2163    mut metadata: PersistedTask,
2164    marker: ExitMarker,
2165    reason: Option<String>,
2166) -> PersistedTask {
2167    match marker {
2168        ExitMarker::Code(code) => {
2169            let status = if code == 0 {
2170                BgTaskStatus::Completed
2171            } else {
2172                BgTaskStatus::Failed
2173            };
2174            metadata.mark_terminal(status, Some(code), reason);
2175        }
2176        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2177    }
2178    metadata
2179}
2180
2181#[cfg(unix)]
2182fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2183    let shell = resolve_posix_shell();
2184    let mut cmd = Command::new(&shell);
2185    cmd.arg("-c")
2186        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2187        .arg(&shell)
2188        .arg(command)
2189        .arg(exit_path);
2190    unsafe {
2191        cmd.pre_exec(|| {
2192            if libc::setsid() == -1 {
2193                return Err(std::io::Error::last_os_error());
2194            }
2195            Ok(())
2196        });
2197    }
2198    cmd
2199}
2200
2201#[cfg(unix)]
2202fn resolve_posix_shell() -> PathBuf {
2203    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2204    POSIX_SHELL
2205        .get_or_init(|| {
2206            std::env::var_os("BASH")
2207                .filter(|value| !value.is_empty())
2208                .map(PathBuf::from)
2209                .filter(|path| path.exists())
2210                .or_else(|| which::which("bash").ok())
2211                .or_else(|| which::which("zsh").ok())
2212                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2213        })
2214        .clone()
2215}
2216
2217#[cfg(windows)]
2218fn detached_shell_command_for(
2219    shell: crate::windows_shell::WindowsShell,
2220    command: &str,
2221    exit_path: &Path,
2222    paths: &TaskPaths,
2223    creation_flags: u32,
2224) -> Result<Command, String> {
2225    use crate::windows_shell::WindowsShell;
2226    // Write the wrapper to a temp file alongside the other task files,
2227    // then invoke the shell with the file path as a single clean
2228    // argument. This sidesteps the entire Windows command-line quoting
2229    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
2230    // parser all interacting with embedded quotes in the wrapper).
2231    //
2232    // Path arguments don't need quoting in the same problematic way
2233    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
2234    // contains no characters that need shell escaping; (2) the wrapper
2235    // body's internal quotes never reach the shell command line — the
2236    // shell reads them from disk by file syntax rules, not command-line
2237    // parser rules.
2238    let wrapper_body = shell.wrapper_script(command, exit_path);
2239    let wrapper_ext = match shell {
2240        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2241        WindowsShell::Cmd => "bat",
2242        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
2243        // so the file extension is purely cosmetic; `.sh` matches what an
2244        // operator would expect when grepping the spill directory.
2245        WindowsShell::Posix(_) => "sh",
2246    };
2247    let wrapper_path = paths.dir.join(format!(
2248        "{}.{}",
2249        paths
2250            .json
2251            .file_stem()
2252            .and_then(|s| s.to_str())
2253            .unwrap_or("wrapper"),
2254        wrapper_ext
2255    ));
2256    fs::write(&wrapper_path, wrapper_body)
2257        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2258
2259    let mut cmd = Command::new(shell.binary().as_ref());
2260    match shell {
2261        WindowsShell::Pwsh | WindowsShell::Powershell => {
2262            // -File runs the script with no quoting issues. `-NoLogo`,
2263            // `-NoProfile`, etc. apply to the host before the file runs.
2264            cmd.args([
2265                "-NoLogo",
2266                "-NoProfile",
2267                "-NonInteractive",
2268                "-ExecutionPolicy",
2269                "Bypass",
2270                "-File",
2271            ]);
2272            cmd.arg(&wrapper_path);
2273        }
2274        WindowsShell::Cmd => {
2275            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
2276            // file via /C is well-defined; the file's contents are
2277            // read line-by-line by cmd's batch processor, NOT
2278            // re-interpreted by the /C parser. This avoids the
2279            // "filename syntax incorrect" errors that came from
2280            // having complex compound commands on the cmd line.
2281            cmd.args(["/D", "/C"]);
2282            cmd.arg(&wrapper_path);
2283        }
2284        WindowsShell::Posix(_) => {
2285            // git-bash and other POSIX shells run the wrapper script with
2286            // `<binary> <wrapper-path>` (the wrapper is just a shell
2287            // script). No special flags needed — the `trap` and atomic
2288            // exit-marker rename in `wrapper_script` are POSIX-standard.
2289            cmd.arg(&wrapper_path);
2290        }
2291    }
2292
2293    // Win32 process creation flags. Caller selects whether to include
2294    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
2295    // for the breakaway-fallback strategy.
2296    cmd.creation_flags(creation_flags);
2297    Ok(cmd)
2298}
2299
2300/// Spawn a detached background bash child process.
2301///
2302/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
2303/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
2304/// cmd.exe) and retries with the next candidate when the previous one
2305/// fails to spawn with `NotFound` — the same runtime safety net the
2306/// foreground bash path has, so issue #27 callers landing on cmd.exe
2307/// fallback can also use background bash. The wrapper script is
2308/// regenerated per attempt because PowerShell wrappers embed the shell
2309/// binary by name; the stdout/stderr capture handles are also reopened
2310/// per attempt because `Command::spawn()` consumes them.
2311///
2312/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
2313/// return immediately without retry — they indicate a problem with the
2314/// resolved shell that retrying with a different shell won't fix.
2315fn spawn_detached_child(
2316    command: &str,
2317    paths: &TaskPaths,
2318    workdir: &Path,
2319    env: &HashMap<String, String>,
2320) -> Result<std::process::Child, String> {
2321    #[cfg(not(windows))]
2322    {
2323        let stdout = create_capture_file(&paths.stdout)
2324            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2325        let stderr = create_capture_file(&paths.stderr)
2326            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2327        detached_shell_command(command, &paths.exit)
2328            .current_dir(workdir)
2329            .envs(env)
2330            .stdin(Stdio::null())
2331            .stdout(Stdio::from(stdout))
2332            .stderr(Stdio::from(stderr))
2333            .spawn()
2334            .map_err(|e| format!("failed to spawn background bash command: {e}"))
2335    }
2336    #[cfg(windows)]
2337    {
2338        use crate::windows_shell::shell_candidates;
2339        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
2340        // legacy foreground bash spawn path. v0.20 routes ALL bash through
2341        // this background spawn helper, including foreground tool calls
2342        // where the model writes PowerShell-syntax (`$var = ...`,
2343        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
2344        // The earlier v0.18-era cmd-first override worked around a
2345        // PowerShell detached-output bug; that bug is fixed at the
2346        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
2347        // see flag block below), so we no longer need to misroute PS
2348        // commands through cmd.
2349        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
2350        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
2351        // first (so the bg child outlives the AFT process when AFT is killed),
2352        // then fall back without it for environments where the parent is in a
2353        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
2354        // runners (GitHub Actions windows-2022) and some MDM-managed corp
2355        // environments hit this — `CreateProcess` returns Access Denied (5).
2356        // Without breakaway, the child still runs detached but will be torn
2357        // down with the parent if the parent process group is signaled.
2358        //
2359        // We use CREATE_NO_WINDOW (no visible console window, but the
2360        // child still has a hidden console) rather than DETACHED_PROCESS
2361        // (no console at all). PowerShell-based wrappers that perform
2362        // file I/O via [System.IO.File] need a console handle to flush
2363        // stdout/stderr correctly even when redirected — under
2364        // DETACHED_PROCESS, pwsh sometimes silently exits before
2365        // executing later script statements (the Move-Item that writes
2366        // the exit marker never runs), leaving the bg task forever
2367        // marked Failed: process exited without exit marker. cmd.exe
2368        // wrappers tolerate DETACHED_PROCESS, but switching to
2369        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
2370        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
2371        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
2372        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
2373        let with_breakaway =
2374            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
2375        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
2376        let mut last_error: Option<String> = None;
2377        for (idx, shell) in candidates.iter().enumerate() {
2378            // Per-shell, try with breakaway first. If the process is in a
2379            // restrictive job, the breakaway flag triggers Access Denied
2380            // (os error 5). Retry once without breakaway.
2381            for &flags in &[with_breakaway, without_breakaway] {
2382                // Re-open capture handles per attempt; spawn() consumes them.
2383                let stdout = create_capture_file(&paths.stdout)
2384                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2385                let stderr = create_capture_file(&paths.stderr)
2386                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2387                let mut cmd =
2388                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
2389                cmd.current_dir(workdir)
2390                    .envs(env)
2391                    .stdin(Stdio::null())
2392                    .stdout(Stdio::from(stdout))
2393                    .stderr(Stdio::from(stderr));
2394                match cmd.spawn() {
2395                    Ok(child) => {
2396                        if idx > 0 {
2397                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
2398                             the cached PATH probe disagreed with runtime spawn — likely PATH \
2399                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
2400                            shell.binary(),
2401                            idx);
2402                        }
2403                        if flags == without_breakaway {
2404                            crate::slog_warn!(
2405                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
2406                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
2407                             Spawned without breakaway; the bg task will be torn down if the \
2408                             AFT process group is killed."
2409                            );
2410                        }
2411                        return Ok(child);
2412                    }
2413                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2414                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
2415                        shell.binary());
2416                        last_error = Some(format!("{}: {e}", shell.binary()));
2417                        // Skip the without-breakaway retry for NotFound — the
2418                        // binary itself is missing, breakaway flag is irrelevant.
2419                        break;
2420                    }
2421                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
2422                        // Access Denied during breakaway — retry without it.
2423                        crate::slog_warn!(
2424                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
2425                         Access Denied — retrying {} without breakaway",
2426                            shell.binary()
2427                        );
2428                        last_error = Some(format!("{}: {e}", shell.binary()));
2429                        continue;
2430                    }
2431                    Err(e) => {
2432                        return Err(format!(
2433                            "failed to spawn background bash command via {}: {e}",
2434                            shell.binary()
2435                        ));
2436                    }
2437                }
2438            }
2439        }
2440        Err(format!(
2441            "failed to spawn background bash command: no Windows shell could be spawned. \
2442             Last error: {}. PATH-probed candidates: {:?}",
2443            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
2444            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
2445        ))
2446    }
2447}
2448
2449fn random_slug() -> String {
2450    let mut bytes = [0u8; 4];
2451    // getrandom is a transitive dependency; use it directly for OS entropy.
2452    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
2453        // Extremely unlikely fallback: time + pid mix.
2454        let t = SystemTime::now()
2455            .duration_since(UNIX_EPOCH)
2456            .map(|d| d.subsec_nanos())
2457            .unwrap_or(0);
2458        let p = std::process::id();
2459        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
2460    });
2461    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
2462    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
2463    format!("bash-{hex}")
2464}
2465
2466#[cfg(test)]
2467mod tests {
2468    use std::collections::HashMap;
2469    #[cfg(windows)]
2470    use std::fs;
2471    use std::sync::{Arc, Mutex};
2472    use std::time::Duration;
2473    #[cfg(windows)]
2474    use std::time::Instant;
2475
2476    use super::*;
2477
2478    #[cfg(unix)]
2479    const QUICK_SUCCESS_COMMAND: &str = "true";
2480    #[cfg(windows)]
2481    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
2482
2483    #[cfg(unix)]
2484    const LONG_RUNNING_COMMAND: &str = "sleep 5";
2485    #[cfg(windows)]
2486    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
2487
2488    /// Spawn a child process that exits immediately and return it after
2489    /// it has terminated. Used by reap_child tests to simulate the
2490    /// "child exists and is dead" state when the watchdog has already
2491    /// nulled out the original child handle.
2492    fn spawn_dead_child() -> std::process::Child {
2493        #[cfg(unix)]
2494        let mut cmd = std::process::Command::new("true");
2495        #[cfg(windows)]
2496        let mut cmd = {
2497            let mut c = std::process::Command::new("cmd");
2498            c.args(["/c", "exit", "0"]);
2499            c
2500        };
2501        cmd.stdin(std::process::Stdio::null());
2502        cmd.stdout(std::process::Stdio::null());
2503        cmd.stderr(std::process::Stdio::null());
2504        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
2505        // Poll try_wait() until the child actually exits, instead of calling
2506        // wait() which closes the OS handle. On Windows, after wait()
2507        // closes the handle, subsequent try_wait() calls (which reap_child
2508        // depends on) return Err — the test was inadvertently giving
2509        // reap_child an unusable child handle. Polling try_wait() keeps the
2510        // handle open and observes natural exit, matching the production
2511        // shape where the watchdog discovers an exited child for the first
2512        // time.
2513        let started = Instant::now();
2514        loop {
2515            match child.try_wait() {
2516                Ok(Some(_)) => break,
2517                Ok(None) => {
2518                    if started.elapsed() > Duration::from_secs(5) {
2519                        panic!("dead-child stand-in did not exit within 5s");
2520                    }
2521                    std::thread::sleep(Duration::from_millis(10));
2522                }
2523                Err(error) => panic!("dead-child try_wait failed: {error}"),
2524            }
2525        }
2526        child
2527    }
2528
2529    #[test]
2530    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
2531        let registry = BgTaskRegistry::default();
2532        let dir = tempfile::tempdir().unwrap();
2533        let task_id = registry
2534            .spawn(
2535                QUICK_SUCCESS_COMMAND,
2536                "session".to_string(),
2537                dir.path().to_path_buf(),
2538                HashMap::new(),
2539                Some(Duration::from_secs(30)),
2540                dir.path().to_path_buf(),
2541                10,
2542                true,
2543                false,
2544                Some(dir.path().to_path_buf()),
2545            )
2546            .unwrap();
2547        registry
2548            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2549            .unwrap();
2550        let completions = registry.drain_completions_for_session(Some("session"));
2551        assert_eq!(completions.len(), 1);
2552        assert_eq!(
2553            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
2554            vec![task_id.clone()]
2555        );
2556
2557        registry.cleanup_finished(Duration::ZERO);
2558
2559        assert!(registry.inner.tasks.lock().unwrap().is_empty());
2560    }
2561
2562    #[test]
2563    fn cleanup_finished_retains_undelivered_terminals() {
2564        let registry = BgTaskRegistry::default();
2565        let dir = tempfile::tempdir().unwrap();
2566        let task_id = registry
2567            .spawn(
2568                QUICK_SUCCESS_COMMAND,
2569                "session".to_string(),
2570                dir.path().to_path_buf(),
2571                HashMap::new(),
2572                Some(Duration::from_secs(30)),
2573                dir.path().to_path_buf(),
2574                10,
2575                true,
2576                false,
2577                Some(dir.path().to_path_buf()),
2578            )
2579            .unwrap();
2580        registry
2581            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2582            .unwrap();
2583
2584        registry.cleanup_finished(Duration::ZERO);
2585
2586        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2587    }
2588
2589    /// Issue #27 Oracle review P1 + P2 test gap: verify that the live
2590    /// watchdog path (reap_child) marks a task Failed when the child
2591    /// has exited but no exit marker was written. Before this fix the
2592    /// task would remain `Running` until timeout, even though the
2593    /// process was definitely dead.
2594    ///
2595    /// Cross-platform: uses a quick-exiting command that does NOT go
2596    /// through the wrapper script (we manually clear the exit marker
2597    /// after spawn to simulate the wrapper crashing before write).
2598    #[test]
2599    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
2600        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2601        let dir = tempfile::tempdir().unwrap();
2602        let task_id = registry
2603            .spawn(
2604                QUICK_SUCCESS_COMMAND,
2605                "session".to_string(),
2606                dir.path().to_path_buf(),
2607                HashMap::new(),
2608                Some(Duration::from_secs(30)),
2609                dir.path().to_path_buf(),
2610                10,
2611                true,
2612                false,
2613                Some(dir.path().to_path_buf()),
2614            )
2615            .unwrap();
2616
2617        let task = registry.task_for_session(&task_id, "session").unwrap();
2618
2619        // Wait for the child to actually exit and the wrapper to either
2620        // write the marker or fail. Then nuke the marker to simulate
2621        // wrapper crash before write. Poll up to 5s; this is plenty for a
2622        // `true`/`cmd /c exit 0` invocation.
2623        let started = Instant::now();
2624        loop {
2625            let exited = {
2626                let mut state = task.state.lock().unwrap();
2627                if let Some(child) = state.child.as_mut() {
2628                    matches!(child.try_wait(), Ok(Some(_)))
2629                } else {
2630                    true
2631                }
2632            };
2633            if exited {
2634                break;
2635            }
2636            assert!(
2637                started.elapsed() < Duration::from_secs(5),
2638                "child should exit quickly"
2639            );
2640            std::thread::sleep(Duration::from_millis(20));
2641        }
2642
2643        // Stop the watchdog so it doesn't race with our manual reap_child.
2644        // On fast Windows runners the watchdog ticks (every 500ms) can
2645        // observe the child exit and reap it before this test's assertion
2646        // fires, leaving us with state.child = None and an already-terminal
2647        // status. We specifically want to test reap_child's logic when
2648        // invoked manually on a Running-but-actually-dead task, so we need
2649        // exclusive control over the reap path here.
2650        registry
2651            .inner
2652            .shutdown
2653            .store(true, std::sync::atomic::Ordering::SeqCst);
2654        // Give the watchdog at most one tick (500ms) to notice shutdown
2655        // before we touch task state. Without this, an in-flight watchdog
2656        // iteration could still race with our state setup below.
2657        std::thread::sleep(Duration::from_millis(550));
2658
2659        // Wrapper likely wrote the marker by now; remove it to simulate
2660        // a wrapper crash that exited before persisting the exit code.
2661        let _ = std::fs::remove_file(&task.paths.exit);
2662
2663        // The watchdog may have already reaped the child handle and
2664        // marked the task terminal before we got here. Reset both so
2665        // reap_child has the "Running task whose child just exited"
2666        // shape it's designed to handle. We don't restore state.child
2667        // (the underlying OS process is gone), but reap_child's
2668        // try_wait path won't be exercised; we're testing the
2669        // status-transition logic when state.child is set to a dead
2670        // child OR None and the marker is missing.
2671        //
2672        // CRITICAL on Windows: the watchdog ticks fast enough that the
2673        // JSON on disk may already say `Completed`. `update_task` (called
2674        // by `reap_child`) reads from disk, applies the closure, but
2675        // ROLLS BACK if the original on-disk state was already terminal
2676        // (see persistence.rs::update_task). So we must reset BOTH
2677        // in-memory metadata AND the JSON on disk to a Running state to
2678        // give reap_child the fresh shape it expects to operate on.
2679        {
2680            let mut state = task.state.lock().unwrap();
2681            state.metadata.status = BgTaskStatus::Running;
2682            state.metadata.status_reason = None;
2683            state.metadata.exit_code = None;
2684            state.metadata.finished_at = None;
2685            state.metadata.duration_ms = None;
2686            // Persist the reset state to disk so update_task's terminal
2687            // rollback guard sees a non-terminal starting point.
2688            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2689                .expect("persist reset Running metadata for reap_child test");
2690            // If the watchdog already nulled state.child, we need to
2691            // simulate "child exists and is dead" so reap_child's
2692            // try_wait path runs. Spawn a quick-exit child as a stand-in.
2693            if state.child.is_none() {
2694                state.child = Some(spawn_dead_child());
2695            }
2696        }
2697        // Clear the terminal_at marker too so mark_terminal_now() can fire
2698        // again inside reap_child.
2699        *task.terminal_at.lock().unwrap() = None;
2700
2701        // Sanity: task is still Running per metadata (replay/poll hasn't
2702        // observed the missing marker yet).
2703        assert!(
2704            task.is_running(),
2705            "precondition: metadata.status == Running"
2706        );
2707        assert!(
2708            !task.paths.exit.exists(),
2709            "precondition: exit marker absent"
2710        );
2711
2712        // Invoke the watchdog's reap_child directly. The fix should mark
2713        // the task Failed with the documented reason string, instead of
2714        // just dropping the child handle and leaving status=Running.
2715        registry.reap_child(&task);
2716
2717        let state = task.state.lock().unwrap();
2718        assert!(
2719            state.metadata.status.is_terminal(),
2720            "reap_child must transition to terminal when PID dead and no marker. \
2721             Got status={:?}",
2722            state.metadata.status
2723        );
2724        assert_eq!(
2725            state.metadata.status,
2726            BgTaskStatus::Failed,
2727            "must specifically be Failed (not Killed): status={:?}",
2728            state.metadata.status
2729        );
2730        assert_eq!(
2731            state.metadata.status_reason.as_deref(),
2732            Some("process exited without exit marker"),
2733            "reason must match replay path's wording: {:?}",
2734            state.metadata.status_reason
2735        );
2736        assert!(
2737            state.child.is_none(),
2738            "child handle must be released after reap"
2739        );
2740        assert!(state.detached, "task must be marked detached after reap");
2741    }
2742
2743    /// Companion to the above: when the exit marker DOES exist on disk
2744    /// at reap_child time (race window — wrapper finished writing
2745    /// between try_wait and the marker check), reap_child must NOT mark
2746    /// the task Failed. Instead it leaves status=Running and lets the
2747    /// next poll_task() cycle finalize via the marker.
2748    #[test]
2749    fn reap_child_preserves_running_when_exit_marker_exists() {
2750        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2751        let dir = tempfile::tempdir().unwrap();
2752        let task_id = registry
2753            .spawn(
2754                QUICK_SUCCESS_COMMAND,
2755                "session".to_string(),
2756                dir.path().to_path_buf(),
2757                HashMap::new(),
2758                Some(Duration::from_secs(30)),
2759                dir.path().to_path_buf(),
2760                10,
2761                true,
2762                false,
2763                Some(dir.path().to_path_buf()),
2764            )
2765            .unwrap();
2766
2767        let task = registry.task_for_session(&task_id, "session").unwrap();
2768
2769        // Wait for child to exit AND for the marker to land. Both happen
2770        // shortly after the wrapper finishes — but we want both observed.
2771        let started = Instant::now();
2772        loop {
2773            let exited = {
2774                let mut state = task.state.lock().unwrap();
2775                if let Some(child) = state.child.as_mut() {
2776                    matches!(child.try_wait(), Ok(Some(_)))
2777                } else {
2778                    true
2779                }
2780            };
2781            if exited && task.paths.exit.exists() {
2782                break;
2783            }
2784            assert!(
2785                started.elapsed() < Duration::from_secs(5),
2786                "child should exit and write marker quickly"
2787            );
2788            std::thread::sleep(Duration::from_millis(20));
2789        }
2790
2791        // Stop the watchdog so it doesn't race with our manual reap_child.
2792        // On fast Windows runners the watchdog can call poll_task (which
2793        // finalizes via marker) before this test asserts the
2794        // "marker exists, status still Running" invariant. We want
2795        // exclusive control over the reap path.
2796        registry
2797            .inner
2798            .shutdown
2799            .store(true, std::sync::atomic::Ordering::SeqCst);
2800        std::thread::sleep(Duration::from_millis(550));
2801
2802        // If the watchdog already finalized the task before we stopped it,
2803        // restore the test setup: reset status to Running and ensure the
2804        // marker file is still on disk. We're testing reap_child's
2805        // behavior when called manually with both child-exited AND
2806        // marker-present, regardless of whether the watchdog beat us.
2807        {
2808            let mut state = task.state.lock().unwrap();
2809            state.metadata.status = BgTaskStatus::Running;
2810            state.metadata.status_reason = None;
2811            if state.child.is_none() {
2812                state.child = Some(spawn_dead_child());
2813            }
2814        }
2815        *task.terminal_at.lock().unwrap() = None;
2816        // Make sure the marker is still on disk (poll_task removes it on
2817        // finalization). Recreate it if needed.
2818        if !task.paths.exit.exists() {
2819            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2820        }
2821
2822        // reap_child sees: child exited, marker exists. It should:
2823        //  - drop state.child / set state.detached = true
2824        //  - NOT change status (poll_task will finalize via marker next tick)
2825        registry.reap_child(&task);
2826
2827        let state = task.state.lock().unwrap();
2828        assert!(
2829            state.child.is_none(),
2830            "child handle still released even when marker exists"
2831        );
2832        assert!(
2833            state.detached,
2834            "task still marked detached even when marker exists"
2835        );
2836        // Status remains Running because reap_child defers to poll_task
2837        // when a marker exists. It would be wrong for reap to record the
2838        // marker outcome (poll_task does that with proper exit-code
2839        // parsing).
2840        assert_eq!(
2841            state.metadata.status,
2842            BgTaskStatus::Running,
2843            "reap_child must defer to poll_task when marker exists"
2844        );
2845    }
2846
2847    #[test]
2848    fn cleanup_finished_keeps_running_tasks() {
2849        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2850        let dir = tempfile::tempdir().unwrap();
2851        let task_id = registry
2852            .spawn(
2853                LONG_RUNNING_COMMAND,
2854                "session".to_string(),
2855                dir.path().to_path_buf(),
2856                HashMap::new(),
2857                Some(Duration::from_secs(30)),
2858                dir.path().to_path_buf(),
2859                10,
2860                true,
2861                false,
2862                Some(dir.path().to_path_buf()),
2863            )
2864            .unwrap();
2865
2866        registry.cleanup_finished(Duration::ZERO);
2867
2868        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2869        let _ = registry.kill(&task_id, "session");
2870    }
2871
2872    #[cfg(windows)]
2873    fn wait_for_file(path: &Path) -> String {
2874        let started = Instant::now();
2875        loop {
2876            if path.exists() {
2877                return fs::read_to_string(path).expect("read file");
2878            }
2879            assert!(
2880                started.elapsed() < Duration::from_secs(30),
2881                "timed out waiting for {}",
2882                path.display()
2883            );
2884            std::thread::sleep(Duration::from_millis(100));
2885        }
2886    }
2887
2888    #[cfg(windows)]
2889    fn spawn_windows_registry_command(
2890        command: &str,
2891    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2892        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2893        let dir = tempfile::tempdir().unwrap();
2894        let task_id = registry
2895            .spawn(
2896                command,
2897                "session".to_string(),
2898                dir.path().to_path_buf(),
2899                HashMap::new(),
2900                Some(Duration::from_secs(30)),
2901                dir.path().to_path_buf(),
2902                10,
2903                false,
2904                false,
2905                Some(dir.path().to_path_buf()),
2906            )
2907            .unwrap();
2908        (registry, dir, task_id)
2909    }
2910
2911    #[cfg(windows)]
2912    #[test]
2913    fn windows_spawn_writes_exit_marker_for_zero_exit() {
2914        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2915        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2916
2917        let content = wait_for_file(&exit_path);
2918
2919        assert_eq!(content.trim(), "0");
2920    }
2921
2922    #[cfg(windows)]
2923    #[test]
2924    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2925        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2926        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2927
2928        let content = wait_for_file(&exit_path);
2929
2930        assert_eq!(content.trim(), "42");
2931    }
2932
2933    #[cfg(windows)]
2934    #[test]
2935    fn windows_spawn_captures_stdout_to_disk() {
2936        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2937        let task = registry.task_for_session(&task_id, "session").unwrap();
2938        let stdout_path = task.paths.stdout.clone();
2939        let exit_path = task.paths.exit.clone();
2940
2941        let _ = wait_for_file(&exit_path);
2942        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2943
2944        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2945    }
2946
2947    #[cfg(windows)]
2948    #[test]
2949    fn windows_spawn_uses_pwsh_when_available() {
2950        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
2951        // (We intentionally pass None for shell_env to keep this test
2952        // independent of the runner's actual env.)
2953        let candidates = crate::windows_shell::shell_candidates_with(
2954            |binary| match binary {
2955                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2956                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2957                _ => None,
2958            },
2959            || None,
2960        );
2961        let shell = candidates.first().expect("at least one candidate").clone();
2962        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2963        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2964    }
2965
2966    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
2967    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
2968    /// evaluation is the default for batch files; parse-time expansion only
2969    /// applies to compound `&`-chained inline commands). Capturing
2970    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
2971    /// command runs records the real run-time exit code.
2972    #[cfg(windows)]
2973    #[test]
2974    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2975        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2976        let script =
2977            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2978
2979        // Batch wrapper: capture exit code into CODE on the line after the
2980        // user command, then write CODE to a temp marker file before
2981        // atomic-renaming it into place.
2982        assert!(
2983            script.contains("set CODE=%ERRORLEVEL%"),
2984            "wrapper must capture exit code into CODE: {script}"
2985        );
2986        assert!(
2987            script.contains("echo %CODE% >"),
2988            "wrapper must echo CODE to a temp marker file: {script}"
2989        );
2990        assert!(
2991            script.contains("move /Y"),
2992            "wrapper must use atomic move to write the marker: {script}"
2993        );
2994        // move output must be redirected to nul to avoid polluting the
2995        // user's captured stdout with "1 file(s) moved." lines.
2996        assert!(
2997            script.contains("> nul"),
2998            "wrapper must redirect move output to nul: {script}"
2999        );
3000        // exit /B %CODE% propagates the real exit code so wait() sees it.
3001        assert!(
3002            script.contains("exit /B %CODE%"),
3003            "wrapper must propagate the captured exit code: {script}"
3004        );
3005        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3006        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3007    }
3008
3009    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
3010    /// written to a `.bat` file where batch-line evaluation captures
3011    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
3012    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
3013    /// internal `"`-quoting from `cmd_quote`).
3014    #[cfg(windows)]
3015    #[test]
3016    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3017        use crate::windows_shell::WindowsShell;
3018        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3019        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3020        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3021        assert_eq!(
3022            args_strs,
3023            vec!["/D", "/S", "/C", "echo wrapped"],
3024            "Cmd::bg_command must prepend /D /S /C"
3025        );
3026    }
3027
3028    /// PowerShell variants don't need `/V:ON`-style flags; their args
3029    /// are the same for foreground (`command()`) and background
3030    /// (`bg_command()`).
3031    #[cfg(windows)]
3032    #[test]
3033    fn windows_shell_pwsh_bg_command_uses_standard_args() {
3034        use crate::windows_shell::WindowsShell;
3035        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3036        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3037        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3038        assert!(
3039            args_strs.contains(&"-Command"),
3040            "Pwsh::bg_command must use -Command: {args_strs:?}"
3041        );
3042        assert!(
3043            args_strs.contains(&"Get-Date"),
3044            "Pwsh::bg_command must include the user command body"
3045        );
3046    }
3047
3048    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
3049    /// **cmd.exe-specific** wrapper path captures the user command's
3050    /// run-time exit code correctly. The existing
3051    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
3052    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
3053    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
3054    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
3055    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
3056    ///
3057    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
3058    /// force the cmd-wrapper code path, then writes the exit marker and
3059    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
3060    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
3061    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
3062    /// regardless of what the user command returned.
3063    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
3064    /// the inline command-line wrapper helper that production code does
3065    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
3066    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
3067    /// produced silent failures on Windows 11 (move /Y could not parse
3068    /// path arguments through cmd's /C parser). The `bg_command` helper
3069    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
3070    /// the production spawn path goes through `detached_shell_command_for`
3071    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
3072    /// <bat-path>`.
3073    ///
3074    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
3075    /// covered live by the Windows e2e harness scenario 2d
3076    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
3077    /// exercises the real file-based wrapper end-to-end via the protocol.
3078    #[allow(dead_code)]
3079    #[cfg(any())] // disabled on all targets
3080    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3081}