Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16
17use super::buffer::BgBuffer;
18use super::persistence::{
19    create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
20    update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
21};
22#[cfg(unix)]
23use super::process::terminate_pgid;
24use super::{BgTaskInfo, BgTaskStatus};
25
26/// Default timeout for background bash tasks: 30 minutes.
27/// Agents can override per-call via the `timeout` parameter (in ms).
28const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
29const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
30
31/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
32/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
33/// of typical command output (git status, test results, exit messages) — short
34/// enough that round-tripping multiple completions in one reminder stays well
35/// under the model's context budget but long enough that most successful runs
36/// don't need a follow-up `bash_status` call.
37const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
38
39#[derive(Debug, Clone, Serialize)]
40pub struct BgCompletion {
41    pub task_id: String,
42    /// Intentionally omitted from serialized completion payloads: push frames
43    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
44    #[serde(skip_serializing)]
45    pub session_id: String,
46    pub status: BgTaskStatus,
47    pub exit_code: Option<i32>,
48    pub command: String,
49    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
50    /// cached so push-frame consumers and `bash_drain_completions` callers see
51    /// the same preview without racing against later output rotation. Empty
52    /// when not captured (e.g., persisted task seen on startup before buffer
53    /// reattachment).
54    #[serde(default, skip_serializing_if = "String::is_empty")]
55    pub output_preview: String,
56    /// True when the captured tail is shorter than the actual output (because
57    /// rotation occurred or the output exceeds the preview cap). Plugins use
58    /// this to render a `…` prefix and signal that `bash_status` would return
59    /// more.
60    #[serde(default, skip_serializing_if = "is_false")]
61    pub output_truncated: bool,
62}
63
64fn is_false(v: &bool) -> bool {
65    !*v
66}
67
68#[derive(Debug, Clone, Serialize)]
69pub struct BgTaskSnapshot {
70    #[serde(flatten)]
71    pub info: BgTaskInfo,
72    pub exit_code: Option<i32>,
73    pub child_pid: Option<u32>,
74    pub workdir: String,
75    pub output_preview: String,
76    pub output_truncated: bool,
77    pub output_path: Option<String>,
78    pub stderr_path: Option<String>,
79}
80
81#[derive(Clone)]
82pub struct BgTaskRegistry {
83    pub(crate) inner: Arc<RegistryInner>,
84}
85
86pub(crate) struct RegistryInner {
87    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
88    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
89    pub(crate) progress_sender: SharedProgressSender,
90    watchdog_started: AtomicBool,
91    pub(crate) shutdown: AtomicBool,
92}
93
94pub(crate) struct BgTask {
95    pub(crate) task_id: String,
96    pub(crate) session_id: String,
97    pub(crate) paths: TaskPaths,
98    pub(crate) started: Instant,
99    pub(crate) terminal_at: Mutex<Option<Instant>>,
100    pub(crate) state: Mutex<BgTaskState>,
101}
102
103pub(crate) struct BgTaskState {
104    pub(crate) metadata: PersistedTask,
105    pub(crate) child: Option<Child>,
106    pub(crate) detached: bool,
107    pub(crate) buffer: BgBuffer,
108}
109
110impl BgTaskRegistry {
111    pub fn new(progress_sender: SharedProgressSender) -> Self {
112        Self {
113            inner: Arc::new(RegistryInner {
114                tasks: Mutex::new(HashMap::new()),
115                completions: Mutex::new(VecDeque::new()),
116                progress_sender,
117                watchdog_started: AtomicBool::new(false),
118                shutdown: AtomicBool::new(false),
119            }),
120        }
121    }
122
123    #[cfg(unix)]
124    pub fn spawn(
125        &self,
126        command: &str,
127        session_id: String,
128        workdir: PathBuf,
129        env: HashMap<String, String>,
130        timeout: Option<Duration>,
131        storage_dir: PathBuf,
132        max_running: usize,
133    ) -> Result<String, String> {
134        self.start_watchdog();
135
136        let running = self.running_count();
137        if running >= max_running {
138            return Err(format!(
139                "background bash task limit exceeded: {running} running (max {max_running})"
140            ));
141        }
142
143        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
144        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
145        let task_id = self.generate_unique_task_id()?;
146        let paths = task_paths(&storage_dir, &session_id, &task_id);
147        fs::create_dir_all(&paths.dir)
148            .map_err(|e| format!("failed to create background task dir: {e}"))?;
149
150        let mut metadata = PersistedTask::starting(
151            task_id.clone(),
152            session_id.clone(),
153            command.to_string(),
154            workdir.clone(),
155            timeout_ms,
156        );
157        write_task(&paths.json, &metadata)
158            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
159
160        let stdout = create_capture_file(&paths.stdout)
161            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
162        let stderr = create_capture_file(&paths.stderr)
163            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
164
165        let child = detached_shell_command(command, &paths.exit)
166            .current_dir(&workdir)
167            .envs(&env)
168            .stdin(Stdio::null())
169            .stdout(Stdio::from(stdout))
170            .stderr(Stdio::from(stderr))
171            .spawn()
172            .map_err(|e| format!("failed to spawn background bash command: {e}"))?;
173
174        let child_pid = child.id();
175        metadata.mark_running(child_pid, child_pid as i32);
176        write_task(&paths.json, &metadata)
177            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
178
179        let task = Arc::new(BgTask {
180            task_id: task_id.clone(),
181            session_id,
182            paths: paths.clone(),
183            started: Instant::now(),
184            terminal_at: Mutex::new(None),
185            state: Mutex::new(BgTaskState {
186                metadata,
187                child: Some(child),
188                detached: false,
189                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
190            }),
191        });
192
193        self.inner
194            .tasks
195            .lock()
196            .map_err(|_| "background task registry lock poisoned".to_string())?
197            .insert(task_id.clone(), task);
198
199        Ok(task_id)
200    }
201
202    #[cfg(windows)]
203    pub fn spawn(
204        &self,
205        _command: &str,
206        _session_id: String,
207        _workdir: PathBuf,
208        _env: HashMap<String, String>,
209        _timeout: Option<Duration>,
210        _storage_dir: PathBuf,
211        _max_running: usize,
212    ) -> Result<String, String> {
213        Err("background bash is not yet supported on Windows".to_string())
214    }
215
216    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
217        self.start_watchdog();
218        let dir = session_tasks_dir(storage_dir, session_id);
219        if !dir.exists() {
220            return Ok(());
221        }
222
223        let entries = fs::read_dir(&dir)
224            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
225        for entry in entries.flatten() {
226            let path = entry.path();
227            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
228                continue;
229            }
230            let Ok(mut metadata) = read_task(&path) else {
231                continue;
232            };
233            if metadata.session_id != session_id {
234                continue;
235            }
236
237            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
238            match metadata.status {
239                BgTaskStatus::Starting => {
240                    metadata.mark_terminal(
241                        BgTaskStatus::Failed,
242                        None,
243                        Some("spawn aborted".to_string()),
244                    );
245                    let _ = write_task(&paths.json, &metadata);
246                    self.enqueue_completion_if_needed(&metadata, false);
247                }
248                BgTaskStatus::Running => {
249                    if self.running_metadata_is_stale(&metadata) {
250                        metadata.mark_terminal(
251                            BgTaskStatus::Killed,
252                            None,
253                            Some("orphaned (>24h)".to_string()),
254                        );
255                        if !paths.exit.exists() {
256                            let _ = write_kill_marker_if_absent(&paths.exit);
257                        }
258                        let _ = write_task(&paths.json, &metadata);
259                        self.enqueue_completion_if_needed(&metadata, false);
260                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
261                        metadata = terminal_metadata_from_marker(metadata, marker, None);
262                        let _ = write_task(&paths.json, &metadata);
263                        self.enqueue_completion_if_needed(&metadata, false);
264                    } else {
265                        self.insert_rehydrated_task(metadata, paths, true)?;
266                    }
267                }
268                _ if metadata.status.is_terminal() => {
269                    self.insert_rehydrated_task(metadata.clone(), paths, true)?;
270                    self.enqueue_completion_if_needed(&metadata, false);
271                }
272                _ => {}
273            }
274        }
275
276        Ok(())
277    }
278
279    pub fn status(
280        &self,
281        task_id: &str,
282        session_id: &str,
283        preview_bytes: usize,
284    ) -> Option<BgTaskSnapshot> {
285        let task = self.task_for_session(task_id, session_id)?;
286        let _ = self.poll_task(&task);
287        Some(task.snapshot(preview_bytes))
288    }
289
290    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
291        let tasks = self
292            .inner
293            .tasks
294            .lock()
295            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
296            .unwrap_or_default();
297        tasks
298            .into_iter()
299            .map(|task| {
300                let _ = self.poll_task(&task);
301                task.snapshot(preview_bytes)
302            })
303            .collect()
304    }
305
306    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
307        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
308    }
309
310    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
311        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
312            .map(|_| ())
313    }
314
315    pub fn cleanup_finished(&self, older_than: Duration) {
316        let cutoff = Instant::now().checked_sub(older_than);
317        if let Ok(mut tasks) = self.inner.tasks.lock() {
318            tasks.retain(|_, task| {
319                let is_terminal = task
320                    .state
321                    .lock()
322                    .map(|state| state.metadata.status.is_terminal())
323                    .unwrap_or(false);
324                if !is_terminal {
325                    return true;
326                }
327
328                let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
329                match (terminal_at, cutoff) {
330                    (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
331                    (Some(_), None) => false,
332                    (None, _) => true,
333                }
334            });
335        }
336    }
337
338    pub fn drain_completions(&self) -> Vec<BgCompletion> {
339        self.drain_completions_for_session(None)
340    }
341
342    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
343        let mut completions = match self.inner.completions.lock() {
344            Ok(completions) => completions,
345            Err(_) => return Vec::new(),
346        };
347
348        let drained = if let Some(session_id) = session_id {
349            let mut matched = Vec::new();
350            let mut retained = VecDeque::new();
351            while let Some(completion) = completions.pop_front() {
352                if completion.session_id == session_id {
353                    matched.push(completion);
354                } else {
355                    retained.push_back(completion);
356                }
357            }
358            *completions = retained;
359            matched
360        } else {
361            completions.drain(..).collect()
362        };
363        drop(completions);
364
365        for completion in &drained {
366            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
367                let _ = task.set_completion_delivered(true);
368            }
369        }
370
371        drained
372    }
373
374    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
375        self.inner
376            .completions
377            .lock()
378            .map(|completions| {
379                completions
380                    .iter()
381                    .filter(|completion| completion.session_id == session_id)
382                    .cloned()
383                    .collect()
384            })
385            .unwrap_or_default()
386    }
387
388    pub fn detach(&self) {
389        self.inner.shutdown.store(true, Ordering::SeqCst);
390        if let Ok(mut tasks) = self.inner.tasks.lock() {
391            for task in tasks.values() {
392                if let Ok(mut state) = task.state.lock() {
393                    state.child = None;
394                    state.detached = true;
395                }
396            }
397            tasks.clear();
398        }
399    }
400
401    pub fn shutdown(&self) {
402        let tasks = self
403            .inner
404            .tasks
405            .lock()
406            .map(|tasks| {
407                tasks
408                    .values()
409                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
410                    .collect::<Vec<_>>()
411            })
412            .unwrap_or_default();
413        for (task_id, session_id) in tasks {
414            let _ = self.kill(&task_id, &session_id);
415        }
416    }
417
418    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
419        let marker = match read_exit_marker(&task.paths.exit) {
420            Ok(Some(marker)) => marker,
421            Ok(None) => return Ok(()),
422            Err(error) => return Err(format!("failed to read exit marker: {error}")),
423        };
424        self.finalize_from_marker(task, marker, None)
425    }
426
427    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
428        let Ok(mut state) = task.state.lock() else {
429            return;
430        };
431        if let Some(child) = state.child.as_mut() {
432            if matches!(child.try_wait(), Ok(Some(_))) {
433                state.child = None;
434                state.detached = true;
435            }
436        }
437    }
438
439    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
440        self.inner
441            .tasks
442            .lock()
443            .map(|tasks| {
444                tasks
445                    .values()
446                    .filter(|task| task.is_running())
447                    .cloned()
448                    .collect()
449            })
450            .unwrap_or_default()
451    }
452
453    fn insert_rehydrated_task(
454        &self,
455        metadata: PersistedTask,
456        paths: TaskPaths,
457        detached: bool,
458    ) -> Result<(), String> {
459        let task_id = metadata.task_id.clone();
460        let session_id = metadata.session_id.clone();
461        let task = Arc::new(BgTask {
462            task_id: task_id.clone(),
463            session_id,
464            paths: paths.clone(),
465            started: Instant::now(),
466            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
467            state: Mutex::new(BgTaskState {
468                metadata,
469                child: None,
470                detached,
471                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
472            }),
473        });
474        self.inner
475            .tasks
476            .lock()
477            .map_err(|_| "background task registry lock poisoned".to_string())?
478            .insert(task_id, task);
479        Ok(())
480    }
481
482    fn kill_with_status(
483        &self,
484        task_id: &str,
485        session_id: &str,
486        terminal_status: BgTaskStatus,
487    ) -> Result<BgTaskSnapshot, String> {
488        let task = self
489            .task_for_session(task_id, session_id)
490            .ok_or_else(|| format!("background task not found: {task_id}"))?;
491
492        {
493            let mut state = task
494                .state
495                .lock()
496                .map_err(|_| "background task lock poisoned".to_string())?;
497            if state.metadata.status.is_terminal() {
498                return Ok(task.snapshot_locked(&state, 5 * 1024));
499            }
500
501            state.metadata.status = BgTaskStatus::Killing;
502            write_task(&task.paths.json, &state.metadata)
503                .map_err(|e| format!("failed to persist killing state: {e}"))?;
504
505            #[cfg(unix)]
506            if let Some(pgid) = state.metadata.pgid {
507                terminate_pgid(pgid, state.child.as_mut());
508            }
509            if let Some(child) = state.child.as_mut() {
510                let _ = child.wait();
511            }
512            state.child = None;
513            state.detached = true;
514
515            if !task.paths.exit.exists() {
516                write_kill_marker_if_absent(&task.paths.exit)
517                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
518            }
519
520            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
521                Some(124)
522            } else {
523                None
524            };
525            state
526                .metadata
527                .mark_terminal(terminal_status, exit_code, None);
528            task.mark_terminal_now();
529            write_task(&task.paths.json, &state.metadata)
530                .map_err(|e| format!("failed to persist killed state: {e}"))?;
531            state.buffer.enforce_terminal_cap();
532            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
533        }
534
535        Ok(task.snapshot(5 * 1024))
536    }
537
538    fn finalize_from_marker(
539        &self,
540        task: &Arc<BgTask>,
541        marker: ExitMarker,
542        reason: Option<String>,
543    ) -> Result<(), String> {
544        let mut state = task
545            .state
546            .lock()
547            .map_err(|_| "background task lock poisoned".to_string())?;
548        if state.metadata.status.is_terminal() {
549            return Ok(());
550        }
551
552        let updated = update_task(&task.paths.json, |metadata| {
553            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
554            *metadata = new_metadata;
555        })
556        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
557        state.metadata = updated;
558        task.mark_terminal_now();
559        state.child = None;
560        state.detached = true;
561        state.buffer.enforce_terminal_cap();
562        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
563        Ok(())
564    }
565
566    fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
567        if metadata.status.is_terminal() && !metadata.completion_delivered {
568            self.enqueue_completion_locked(metadata, None, emit_frame);
569        }
570    }
571
572    fn enqueue_completion_locked(
573        &self,
574        metadata: &PersistedTask,
575        buffer: Option<&BgBuffer>,
576        emit_frame: bool,
577    ) {
578        if !metadata.status.is_terminal() || metadata.completion_delivered {
579            return;
580        }
581        // Read tail once at completion time and cache on the BgCompletion so
582        // both the push-frame consumer (running session) and any later
583        // `bash_drain_completions` poll (different session, restart) see the
584        // same preview without racing against rotation.
585        let (output_preview, output_truncated) = match buffer {
586            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
587            None => (String::new(), false),
588        };
589        let completion = BgCompletion {
590            task_id: metadata.task_id.clone(),
591            session_id: metadata.session_id.clone(),
592            status: metadata.status.clone(),
593            exit_code: metadata.exit_code,
594            command: metadata.command.clone(),
595            output_preview,
596            output_truncated,
597        };
598        if let Ok(mut completions) = self.inner.completions.lock() {
599            if completions
600                .iter()
601                .any(|completion| completion.task_id == metadata.task_id)
602            {
603                return;
604            }
605            completions.push_back(completion.clone());
606        } else {
607            return;
608        }
609
610        if emit_frame {
611            self.emit_bash_completed(completion);
612        }
613    }
614
615    fn emit_bash_completed(&self, completion: BgCompletion) {
616        let Ok(progress_sender) = self
617            .inner
618            .progress_sender
619            .lock()
620            .map(|sender| sender.clone())
621        else {
622            return;
623        };
624        let Some(sender) = progress_sender.as_ref() else {
625            return;
626        };
627        // Clone the callback out of the registry mutex before writing to stdout;
628        // otherwise a blocked push-frame write could pin the mutex and starve
629        // unrelated progress-sender updates.
630        // Bg task transitions are discovered by the watchdog thread, so the
631        // sender is shared behind a Mutex. It still uses the same stdout writer
632        // closure as foreground progress frames, preserving the existing lock/
633        // flush behavior in main.rs.
634        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
635            completion.task_id,
636            completion.session_id,
637            completion.status,
638            completion.exit_code,
639            completion.command,
640            completion.output_preview,
641            completion.output_truncated,
642        )));
643    }
644
645    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
646        self.inner
647            .tasks
648            .lock()
649            .ok()
650            .and_then(|tasks| tasks.get(task_id).cloned())
651    }
652
653    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
654        self.task(task_id)
655            .filter(|task| task.session_id == session_id)
656    }
657
658    fn running_count(&self) -> usize {
659        self.inner
660            .tasks
661            .lock()
662            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
663            .unwrap_or(0)
664    }
665
666    fn start_watchdog(&self) {
667        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
668            super::watchdog::start(self.clone());
669        }
670    }
671
672    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
673        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
674    }
675
676    #[cfg(test)]
677    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
678        self.task_for_session(task_id, session_id)
679            .map(|task| task.paths.json.clone())
680    }
681
682    #[cfg(test)]
683    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
684        self.task_for_session(task_id, session_id)
685            .map(|task| task.paths.exit.clone())
686    }
687
688    /// Generate a `bgb-{16hex}` slug that is unique against live tasks and queued completions.
689    fn generate_unique_task_id(&self) -> Result<String, String> {
690        for _ in 0..32 {
691            let candidate = random_slug();
692            let tasks = self
693                .inner
694                .tasks
695                .lock()
696                .map_err(|_| "background task registry lock poisoned".to_string())?;
697            if tasks.contains_key(&candidate) {
698                continue;
699            }
700            let completions = self
701                .inner
702                .completions
703                .lock()
704                .map_err(|_| "background completions lock poisoned".to_string())?;
705            if completions
706                .iter()
707                .any(|completion| completion.task_id == candidate)
708            {
709                continue;
710            }
711            return Ok(candidate);
712        }
713        Err("failed to allocate unique background task id after 32 attempts".to_string())
714    }
715}
716
717impl Default for BgTaskRegistry {
718    fn default() -> Self {
719        Self::new(Arc::new(Mutex::new(None)))
720    }
721}
722
723impl BgTask {
724    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
725        let state = self
726            .state
727            .lock()
728            .unwrap_or_else(|poison| poison.into_inner());
729        self.snapshot_locked(&state, preview_bytes)
730    }
731
732    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
733        let metadata = &state.metadata;
734        let duration_ms = metadata.duration_ms.or_else(|| {
735            metadata
736                .status
737                .is_terminal()
738                .then(|| self.started.elapsed().as_millis() as u64)
739        });
740        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
741        BgTaskSnapshot {
742            info: BgTaskInfo {
743                task_id: self.task_id.clone(),
744                status: metadata.status.clone(),
745                command: metadata.command.clone(),
746                started_at: metadata.started_at,
747                duration_ms,
748            },
749            exit_code: metadata.exit_code,
750            child_pid: metadata.child_pid,
751            workdir: metadata.workdir.display().to_string(),
752            output_preview,
753            output_truncated,
754            output_path: state
755                .buffer
756                .output_path()
757                .map(|path| path.display().to_string()),
758            stderr_path: Some(state.buffer.stderr_path().display().to_string()),
759        }
760    }
761
762    pub(crate) fn is_running(&self) -> bool {
763        self.state
764            .lock()
765            .map(|state| state.metadata.status == BgTaskStatus::Running)
766            .unwrap_or(false)
767    }
768
769    fn mark_terminal_now(&self) {
770        if let Ok(mut terminal_at) = self.terminal_at.lock() {
771            if terminal_at.is_none() {
772                *terminal_at = Some(Instant::now());
773            }
774        }
775    }
776
777    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
778        let mut state = self
779            .state
780            .lock()
781            .map_err(|_| "background task lock poisoned".to_string())?;
782        let updated = update_task(&self.paths.json, |metadata| {
783            metadata.completion_delivered = delivered;
784        })
785        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
786        state.metadata = updated;
787        Ok(())
788    }
789}
790
791fn terminal_metadata_from_marker(
792    mut metadata: PersistedTask,
793    marker: ExitMarker,
794    reason: Option<String>,
795) -> PersistedTask {
796    match marker {
797        ExitMarker::Code(code) => {
798            let status = if code == 0 {
799                BgTaskStatus::Completed
800            } else {
801                BgTaskStatus::Failed
802            };
803            metadata.mark_terminal(status, Some(code), reason);
804        }
805        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
806    }
807    metadata
808}
809
810#[cfg(unix)]
811fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
812    let mut cmd = Command::new("/bin/sh");
813    cmd.arg("-c")
814        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
815        .arg("/bin/sh")
816        .arg(command)
817        .arg(exit_path);
818    unsafe {
819        cmd.pre_exec(|| {
820            if libc::setsid() == -1 {
821                return Err(std::io::Error::last_os_error());
822            }
823            Ok(())
824        });
825    }
826    cmd
827}
828
829fn random_slug() -> String {
830    let mut bytes = [0u8; 4];
831    // getrandom is a transitive dependency; use it directly for OS entropy.
832    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
833        // Extremely unlikely fallback: time + pid mix.
834        let t = SystemTime::now()
835            .duration_since(UNIX_EPOCH)
836            .map(|d| d.subsec_nanos())
837            .unwrap_or(0);
838        let p = std::process::id();
839        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
840    });
841    // `bgb-` + 8 lowercase hex chars — compact, OS-entropy backed.
842    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
843    format!("bgb-{hex}")
844}
845
846#[cfg(test)]
847mod tests {
848    use std::collections::HashMap;
849    use std::sync::{Arc, Mutex};
850    use std::time::Duration;
851
852    use super::*;
853
854    #[test]
855    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
856        let registry = BgTaskRegistry::default();
857        let dir = tempfile::tempdir().unwrap();
858        let task_id = registry
859            .spawn(
860                "true",
861                "session".to_string(),
862                dir.path().to_path_buf(),
863                HashMap::new(),
864                Some(Duration::from_secs(30)),
865                dir.path().to_path_buf(),
866                10,
867            )
868            .unwrap();
869        registry
870            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
871            .unwrap();
872
873        registry.cleanup_finished(Duration::ZERO);
874
875        assert!(registry.inner.tasks.lock().unwrap().is_empty());
876    }
877
878    #[test]
879    fn cleanup_finished_keeps_running_tasks() {
880        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
881        let dir = tempfile::tempdir().unwrap();
882        let task_id = registry
883            .spawn(
884                "sleep 5",
885                "session".to_string(),
886                dir.path().to_path_buf(),
887                HashMap::new(),
888                Some(Duration::from_secs(30)),
889                dir.path().to_path_buf(),
890                10,
891            )
892            .unwrap();
893
894        registry.cleanup_finished(Duration::ZERO);
895
896        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
897        let _ = registry.kill(&task_id, "session");
898    }
899}