Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16
17use super::buffer::BgBuffer;
18use super::persistence::{
19    create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
20    update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
21};
22#[cfg(unix)]
23use super::process::terminate_pgid;
24use super::{BgTaskInfo, BgTaskStatus};
25
26/// Default timeout for background bash tasks: 30 minutes.
27/// Agents can override per-call via the `timeout` parameter (in ms).
28const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
29const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
30
31/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
32/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
33/// of typical command output (git status, test results, exit messages) — short
34/// enough that round-tripping multiple completions in one reminder stays well
35/// under the model's context budget but long enough that most successful runs
36/// don't need a follow-up `bash_status` call.
37const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
38
39#[derive(Debug, Clone, Serialize)]
40pub struct BgCompletion {
41    pub task_id: String,
42    /// Intentionally omitted from serialized completion payloads: push frames
43    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
44    #[serde(skip_serializing)]
45    pub session_id: String,
46    pub status: BgTaskStatus,
47    pub exit_code: Option<i32>,
48    pub command: String,
49    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
50    /// cached so push-frame consumers and `bash_drain_completions` callers see
51    /// the same preview without racing against later output rotation. Empty
52    /// when not captured (e.g., persisted task seen on startup before buffer
53    /// reattachment).
54    #[serde(default, skip_serializing_if = "String::is_empty")]
55    pub output_preview: String,
56    /// True when the captured tail is shorter than the actual output (because
57    /// rotation occurred or the output exceeds the preview cap). Plugins use
58    /// this to render a `…` prefix and signal that `bash_status` would return
59    /// more.
60    #[serde(default, skip_serializing_if = "is_false")]
61    pub output_truncated: bool,
62}
63
64fn is_false(v: &bool) -> bool {
65    !*v
66}
67
68#[derive(Debug, Clone, Serialize)]
69pub struct BgTaskSnapshot {
70    #[serde(flatten)]
71    pub info: BgTaskInfo,
72    pub exit_code: Option<i32>,
73    pub child_pid: Option<u32>,
74    pub workdir: String,
75    pub output_preview: String,
76    pub output_truncated: bool,
77    pub output_path: Option<String>,
78}
79
80#[derive(Clone)]
81pub struct BgTaskRegistry {
82    pub(crate) inner: Arc<RegistryInner>,
83}
84
85pub(crate) struct RegistryInner {
86    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
87    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
88    pub(crate) progress_sender: SharedProgressSender,
89    watchdog_started: AtomicBool,
90    pub(crate) shutdown: AtomicBool,
91}
92
93pub(crate) struct BgTask {
94    pub(crate) task_id: String,
95    pub(crate) session_id: String,
96    pub(crate) paths: TaskPaths,
97    pub(crate) started: Instant,
98    pub(crate) terminal_at: Mutex<Option<Instant>>,
99    pub(crate) state: Mutex<BgTaskState>,
100}
101
102pub(crate) struct BgTaskState {
103    pub(crate) metadata: PersistedTask,
104    pub(crate) child: Option<Child>,
105    pub(crate) detached: bool,
106    pub(crate) buffer: BgBuffer,
107}
108
109impl BgTaskRegistry {
110    pub fn new(progress_sender: SharedProgressSender) -> Self {
111        Self {
112            inner: Arc::new(RegistryInner {
113                tasks: Mutex::new(HashMap::new()),
114                completions: Mutex::new(VecDeque::new()),
115                progress_sender,
116                watchdog_started: AtomicBool::new(false),
117                shutdown: AtomicBool::new(false),
118            }),
119        }
120    }
121
122    #[cfg(unix)]
123    pub fn spawn(
124        &self,
125        command: &str,
126        session_id: String,
127        workdir: PathBuf,
128        env: HashMap<String, String>,
129        timeout: Option<Duration>,
130        storage_dir: PathBuf,
131        max_running: usize,
132    ) -> Result<String, String> {
133        self.start_watchdog();
134
135        let running = self.running_count();
136        if running >= max_running {
137            return Err(format!(
138                "background bash task limit exceeded: {running} running (max {max_running})"
139            ));
140        }
141
142        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
143        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
144        let task_id = self.generate_unique_task_id()?;
145        let paths = task_paths(&storage_dir, &session_id, &task_id);
146        fs::create_dir_all(&paths.dir)
147            .map_err(|e| format!("failed to create background task dir: {e}"))?;
148
149        let mut metadata = PersistedTask::starting(
150            task_id.clone(),
151            session_id.clone(),
152            command.to_string(),
153            workdir.clone(),
154            timeout_ms,
155        );
156        write_task(&paths.json, &metadata)
157            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
158
159        let stdout = create_capture_file(&paths.stdout)
160            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
161        let stderr = create_capture_file(&paths.stderr)
162            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
163
164        let child = detached_shell_command(command, &paths.exit)
165            .current_dir(&workdir)
166            .envs(&env)
167            .stdin(Stdio::null())
168            .stdout(Stdio::from(stdout))
169            .stderr(Stdio::from(stderr))
170            .spawn()
171            .map_err(|e| format!("failed to spawn background bash command: {e}"))?;
172
173        let child_pid = child.id();
174        metadata.mark_running(child_pid, child_pid as i32);
175        write_task(&paths.json, &metadata)
176            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
177
178        let task = Arc::new(BgTask {
179            task_id: task_id.clone(),
180            session_id,
181            paths: paths.clone(),
182            started: Instant::now(),
183            terminal_at: Mutex::new(None),
184            state: Mutex::new(BgTaskState {
185                metadata,
186                child: Some(child),
187                detached: false,
188                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
189            }),
190        });
191
192        self.inner
193            .tasks
194            .lock()
195            .map_err(|_| "background task registry lock poisoned".to_string())?
196            .insert(task_id.clone(), task);
197
198        Ok(task_id)
199    }
200
201    #[cfg(windows)]
202    pub fn spawn(
203        &self,
204        _command: &str,
205        _session_id: String,
206        _workdir: PathBuf,
207        _env: HashMap<String, String>,
208        _timeout: Option<Duration>,
209        _storage_dir: PathBuf,
210        _max_running: usize,
211    ) -> Result<String, String> {
212        Err("background bash is not yet supported on Windows".to_string())
213    }
214
215    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
216        self.start_watchdog();
217        let dir = session_tasks_dir(storage_dir, session_id);
218        if !dir.exists() {
219            return Ok(());
220        }
221
222        let entries = fs::read_dir(&dir)
223            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
224        for entry in entries.flatten() {
225            let path = entry.path();
226            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
227                continue;
228            }
229            let Ok(mut metadata) = read_task(&path) else {
230                continue;
231            };
232            if metadata.session_id != session_id {
233                continue;
234            }
235
236            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
237            match metadata.status {
238                BgTaskStatus::Starting => {
239                    metadata.mark_terminal(
240                        BgTaskStatus::Failed,
241                        None,
242                        Some("spawn aborted".to_string()),
243                    );
244                    let _ = write_task(&paths.json, &metadata);
245                    self.enqueue_completion_if_needed(&metadata, false);
246                }
247                BgTaskStatus::Running => {
248                    if self.running_metadata_is_stale(&metadata) {
249                        metadata.mark_terminal(
250                            BgTaskStatus::Killed,
251                            None,
252                            Some("orphaned (>24h)".to_string()),
253                        );
254                        if !paths.exit.exists() {
255                            let _ = write_kill_marker_if_absent(&paths.exit);
256                        }
257                        let _ = write_task(&paths.json, &metadata);
258                        self.enqueue_completion_if_needed(&metadata, false);
259                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
260                        metadata = terminal_metadata_from_marker(metadata, marker, None);
261                        let _ = write_task(&paths.json, &metadata);
262                        self.enqueue_completion_if_needed(&metadata, false);
263                    } else {
264                        self.insert_rehydrated_task(metadata, paths, true)?;
265                    }
266                }
267                _ if metadata.status.is_terminal() => {
268                    self.insert_rehydrated_task(metadata.clone(), paths, true)?;
269                    self.enqueue_completion_if_needed(&metadata, false);
270                }
271                _ => {}
272            }
273        }
274
275        Ok(())
276    }
277
278    pub fn status(
279        &self,
280        task_id: &str,
281        session_id: &str,
282        preview_bytes: usize,
283    ) -> Option<BgTaskSnapshot> {
284        let task = self.task_for_session(task_id, session_id)?;
285        let _ = self.poll_task(&task);
286        Some(task.snapshot(preview_bytes))
287    }
288
289    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
290        let tasks = self
291            .inner
292            .tasks
293            .lock()
294            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
295            .unwrap_or_default();
296        tasks
297            .into_iter()
298            .map(|task| {
299                let _ = self.poll_task(&task);
300                task.snapshot(preview_bytes)
301            })
302            .collect()
303    }
304
305    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
306        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
307    }
308
309    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
310        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
311            .map(|_| ())
312    }
313
314    pub fn cleanup_finished(&self, older_than: Duration) {
315        let cutoff = Instant::now().checked_sub(older_than);
316        if let Ok(mut tasks) = self.inner.tasks.lock() {
317            tasks.retain(|_, task| {
318                let is_terminal = task
319                    .state
320                    .lock()
321                    .map(|state| state.metadata.status.is_terminal())
322                    .unwrap_or(false);
323                if !is_terminal {
324                    return true;
325                }
326
327                let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
328                match (terminal_at, cutoff) {
329                    (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
330                    (Some(_), None) => false,
331                    (None, _) => true,
332                }
333            });
334        }
335    }
336
337    pub fn drain_completions(&self) -> Vec<BgCompletion> {
338        self.drain_completions_for_session(None)
339    }
340
341    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
342        let mut completions = match self.inner.completions.lock() {
343            Ok(completions) => completions,
344            Err(_) => return Vec::new(),
345        };
346
347        let drained = if let Some(session_id) = session_id {
348            let mut matched = Vec::new();
349            let mut retained = VecDeque::new();
350            while let Some(completion) = completions.pop_front() {
351                if completion.session_id == session_id {
352                    matched.push(completion);
353                } else {
354                    retained.push_back(completion);
355                }
356            }
357            *completions = retained;
358            matched
359        } else {
360            completions.drain(..).collect()
361        };
362        drop(completions);
363
364        for completion in &drained {
365            if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
366                let _ = task.set_completion_delivered(true);
367            }
368        }
369
370        drained
371    }
372
373    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
374        self.inner
375            .completions
376            .lock()
377            .map(|completions| {
378                completions
379                    .iter()
380                    .filter(|completion| completion.session_id == session_id)
381                    .cloned()
382                    .collect()
383            })
384            .unwrap_or_default()
385    }
386
387    pub fn detach(&self) {
388        self.inner.shutdown.store(true, Ordering::SeqCst);
389        if let Ok(mut tasks) = self.inner.tasks.lock() {
390            for task in tasks.values() {
391                if let Ok(mut state) = task.state.lock() {
392                    state.child = None;
393                    state.detached = true;
394                }
395            }
396            tasks.clear();
397        }
398    }
399
400    pub fn shutdown(&self) {
401        let tasks = self
402            .inner
403            .tasks
404            .lock()
405            .map(|tasks| {
406                tasks
407                    .values()
408                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
409                    .collect::<Vec<_>>()
410            })
411            .unwrap_or_default();
412        for (task_id, session_id) in tasks {
413            let _ = self.kill(&task_id, &session_id);
414        }
415    }
416
417    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
418        let marker = match read_exit_marker(&task.paths.exit) {
419            Ok(Some(marker)) => marker,
420            Ok(None) => return Ok(()),
421            Err(error) => return Err(format!("failed to read exit marker: {error}")),
422        };
423        self.finalize_from_marker(task, marker, None)
424    }
425
426    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
427        let Ok(mut state) = task.state.lock() else {
428            return;
429        };
430        if let Some(child) = state.child.as_mut() {
431            if matches!(child.try_wait(), Ok(Some(_))) {
432                state.child = None;
433                state.detached = true;
434            }
435        }
436    }
437
438    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
439        self.inner
440            .tasks
441            .lock()
442            .map(|tasks| {
443                tasks
444                    .values()
445                    .filter(|task| task.is_running())
446                    .cloned()
447                    .collect()
448            })
449            .unwrap_or_default()
450    }
451
452    fn insert_rehydrated_task(
453        &self,
454        metadata: PersistedTask,
455        paths: TaskPaths,
456        detached: bool,
457    ) -> Result<(), String> {
458        let task_id = metadata.task_id.clone();
459        let session_id = metadata.session_id.clone();
460        let task = Arc::new(BgTask {
461            task_id: task_id.clone(),
462            session_id,
463            paths: paths.clone(),
464            started: Instant::now(),
465            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
466            state: Mutex::new(BgTaskState {
467                metadata,
468                child: None,
469                detached,
470                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
471            }),
472        });
473        self.inner
474            .tasks
475            .lock()
476            .map_err(|_| "background task registry lock poisoned".to_string())?
477            .insert(task_id, task);
478        Ok(())
479    }
480
481    fn kill_with_status(
482        &self,
483        task_id: &str,
484        session_id: &str,
485        terminal_status: BgTaskStatus,
486    ) -> Result<BgTaskSnapshot, String> {
487        let task = self
488            .task_for_session(task_id, session_id)
489            .ok_or_else(|| format!("background task not found: {task_id}"))?;
490
491        {
492            let mut state = task
493                .state
494                .lock()
495                .map_err(|_| "background task lock poisoned".to_string())?;
496            if state.metadata.status.is_terminal() {
497                return Ok(task.snapshot_locked(&state, 5 * 1024));
498            }
499
500            state.metadata.status = BgTaskStatus::Killing;
501            write_task(&task.paths.json, &state.metadata)
502                .map_err(|e| format!("failed to persist killing state: {e}"))?;
503
504            #[cfg(unix)]
505            if let Some(pgid) = state.metadata.pgid {
506                terminate_pgid(pgid, state.child.as_mut());
507            }
508            if let Some(child) = state.child.as_mut() {
509                let _ = child.wait();
510            }
511            state.child = None;
512            state.detached = true;
513
514            if !task.paths.exit.exists() {
515                write_kill_marker_if_absent(&task.paths.exit)
516                    .map_err(|e| format!("failed to write kill marker: {e}"))?;
517            }
518
519            let exit_code = if terminal_status == BgTaskStatus::TimedOut {
520                Some(124)
521            } else {
522                None
523            };
524            state
525                .metadata
526                .mark_terminal(terminal_status, exit_code, None);
527            task.mark_terminal_now();
528            write_task(&task.paths.json, &state.metadata)
529                .map_err(|e| format!("failed to persist killed state: {e}"))?;
530            state.buffer.enforce_terminal_cap();
531            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
532        }
533
534        Ok(task.snapshot(5 * 1024))
535    }
536
537    fn finalize_from_marker(
538        &self,
539        task: &Arc<BgTask>,
540        marker: ExitMarker,
541        reason: Option<String>,
542    ) -> Result<(), String> {
543        let mut state = task
544            .state
545            .lock()
546            .map_err(|_| "background task lock poisoned".to_string())?;
547        if state.metadata.status.is_terminal() {
548            return Ok(());
549        }
550
551        let updated = update_task(&task.paths.json, |metadata| {
552            let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
553            *metadata = new_metadata;
554        })
555        .map_err(|e| format!("failed to persist terminal state: {e}"))?;
556        state.metadata = updated;
557        task.mark_terminal_now();
558        state.child = None;
559        state.detached = true;
560        state.buffer.enforce_terminal_cap();
561        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
562        Ok(())
563    }
564
565    fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
566        if metadata.status.is_terminal() && !metadata.completion_delivered {
567            self.enqueue_completion_locked(metadata, None, emit_frame);
568        }
569    }
570
571    fn enqueue_completion_locked(
572        &self,
573        metadata: &PersistedTask,
574        buffer: Option<&BgBuffer>,
575        emit_frame: bool,
576    ) {
577        if !metadata.status.is_terminal() || metadata.completion_delivered {
578            return;
579        }
580        // Read tail once at completion time and cache on the BgCompletion so
581        // both the push-frame consumer (running session) and any later
582        // `bash_drain_completions` poll (different session, restart) see the
583        // same preview without racing against rotation.
584        let (output_preview, output_truncated) = match buffer {
585            Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
586            None => (String::new(), false),
587        };
588        let completion = BgCompletion {
589            task_id: metadata.task_id.clone(),
590            session_id: metadata.session_id.clone(),
591            status: metadata.status.clone(),
592            exit_code: metadata.exit_code,
593            command: metadata.command.clone(),
594            output_preview,
595            output_truncated,
596        };
597        if let Ok(mut completions) = self.inner.completions.lock() {
598            if completions
599                .iter()
600                .any(|completion| completion.task_id == metadata.task_id)
601            {
602                return;
603            }
604            completions.push_back(completion.clone());
605        } else {
606            return;
607        }
608
609        if emit_frame {
610            self.emit_bash_completed(completion);
611        }
612    }
613
614    fn emit_bash_completed(&self, completion: BgCompletion) {
615        let Ok(progress_sender) = self
616            .inner
617            .progress_sender
618            .lock()
619            .map(|sender| sender.clone())
620        else {
621            return;
622        };
623        let Some(sender) = progress_sender.as_ref() else {
624            return;
625        };
626        // Clone the callback out of the registry mutex before writing to stdout;
627        // otherwise a blocked push-frame write could pin the mutex and starve
628        // unrelated progress-sender updates.
629        // Bg task transitions are discovered by the watchdog thread, so the
630        // sender is shared behind a Mutex. It still uses the same stdout writer
631        // closure as foreground progress frames, preserving the existing lock/
632        // flush behavior in main.rs.
633        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
634            completion.task_id,
635            completion.session_id,
636            completion.status,
637            completion.exit_code,
638            completion.command,
639            completion.output_preview,
640            completion.output_truncated,
641        )));
642    }
643
644    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
645        self.inner
646            .tasks
647            .lock()
648            .ok()
649            .and_then(|tasks| tasks.get(task_id).cloned())
650    }
651
652    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
653        self.task(task_id)
654            .filter(|task| task.session_id == session_id)
655    }
656
657    fn running_count(&self) -> usize {
658        self.inner
659            .tasks
660            .lock()
661            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
662            .unwrap_or(0)
663    }
664
665    fn start_watchdog(&self) {
666        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
667            super::watchdog::start(self.clone());
668        }
669    }
670
671    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
672        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
673    }
674
675    #[cfg(test)]
676    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
677        self.task_for_session(task_id, session_id)
678            .map(|task| task.paths.json.clone())
679    }
680
681    #[cfg(test)]
682    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
683        self.task_for_session(task_id, session_id)
684            .map(|task| task.paths.exit.clone())
685    }
686
687    /// Generate a `bgb-{16hex}` slug that is unique against live tasks and queued completions.
688    fn generate_unique_task_id(&self) -> Result<String, String> {
689        for _ in 0..32 {
690            let candidate = random_slug();
691            let tasks = self
692                .inner
693                .tasks
694                .lock()
695                .map_err(|_| "background task registry lock poisoned".to_string())?;
696            if tasks.contains_key(&candidate) {
697                continue;
698            }
699            let completions = self
700                .inner
701                .completions
702                .lock()
703                .map_err(|_| "background completions lock poisoned".to_string())?;
704            if completions
705                .iter()
706                .any(|completion| completion.task_id == candidate)
707            {
708                continue;
709            }
710            return Ok(candidate);
711        }
712        Err("failed to allocate unique background task id after 32 attempts".to_string())
713    }
714}
715
716impl Default for BgTaskRegistry {
717    fn default() -> Self {
718        Self::new(Arc::new(Mutex::new(None)))
719    }
720}
721
722impl BgTask {
723    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
724        let state = self
725            .state
726            .lock()
727            .unwrap_or_else(|poison| poison.into_inner());
728        self.snapshot_locked(&state, preview_bytes)
729    }
730
731    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
732        let metadata = &state.metadata;
733        let duration_ms = metadata.duration_ms.or_else(|| {
734            metadata
735                .status
736                .is_terminal()
737                .then(|| self.started.elapsed().as_millis() as u64)
738        });
739        let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
740        BgTaskSnapshot {
741            info: BgTaskInfo {
742                task_id: self.task_id.clone(),
743                status: metadata.status.clone(),
744                command: metadata.command.clone(),
745                started_at: metadata.started_at,
746                duration_ms,
747            },
748            exit_code: metadata.exit_code,
749            child_pid: metadata.child_pid,
750            workdir: metadata.workdir.display().to_string(),
751            output_preview,
752            output_truncated,
753            output_path: state
754                .buffer
755                .output_path()
756                .map(|path| path.display().to_string()),
757        }
758    }
759
760    pub(crate) fn is_running(&self) -> bool {
761        self.state
762            .lock()
763            .map(|state| state.metadata.status == BgTaskStatus::Running)
764            .unwrap_or(false)
765    }
766
767    fn mark_terminal_now(&self) {
768        if let Ok(mut terminal_at) = self.terminal_at.lock() {
769            if terminal_at.is_none() {
770                *terminal_at = Some(Instant::now());
771            }
772        }
773    }
774
775    fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
776        let mut state = self
777            .state
778            .lock()
779            .map_err(|_| "background task lock poisoned".to_string())?;
780        let updated = update_task(&self.paths.json, |metadata| {
781            metadata.completion_delivered = delivered;
782        })
783        .map_err(|e| format!("failed to update completion delivery: {e}"))?;
784        state.metadata = updated;
785        Ok(())
786    }
787}
788
789fn terminal_metadata_from_marker(
790    mut metadata: PersistedTask,
791    marker: ExitMarker,
792    reason: Option<String>,
793) -> PersistedTask {
794    match marker {
795        ExitMarker::Code(code) => {
796            let status = if code == 0 {
797                BgTaskStatus::Completed
798            } else {
799                BgTaskStatus::Failed
800            };
801            metadata.mark_terminal(status, Some(code), reason);
802        }
803        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
804    }
805    metadata
806}
807
808#[cfg(unix)]
809fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
810    let mut cmd = Command::new("/bin/sh");
811    cmd.arg("-c")
812        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
813        .arg("/bin/sh")
814        .arg(command)
815        .arg(exit_path);
816    unsafe {
817        cmd.pre_exec(|| {
818            if libc::setsid() == -1 {
819                return Err(std::io::Error::last_os_error());
820            }
821            Ok(())
822        });
823    }
824    cmd
825}
826
827fn random_slug() -> String {
828    static COUNTER: AtomicU64 = AtomicU64::new(0);
829    let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
830    let mixed = unix_millis_nanos()
831        ^ (std::process::id() as u128).wrapping_mul(0x9E3779B97F4A7C15)
832        ^ (counter as u128).wrapping_mul(0xBF58476D1CE4E5B9);
833    // Use 64 bits (`bgb-` + 16 lowercase hex chars) to keep IDs compact while
834    // avoiding the birthday-collision risk of the old 32-bit suffix.
835    format!("bgb-{:016x}", mixed as u64)
836}
837
838fn unix_millis_nanos() -> u128 {
839    SystemTime::now()
840        .duration_since(UNIX_EPOCH)
841        .map(|duration| duration.as_nanos())
842        .unwrap_or(0)
843}
844
845#[cfg(test)]
846mod tests {
847    use std::collections::HashMap;
848    use std::sync::{Arc, Mutex};
849    use std::time::Duration;
850
851    use super::*;
852
853    #[test]
854    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
855        let registry = BgTaskRegistry::default();
856        let dir = tempfile::tempdir().unwrap();
857        let task_id = registry
858            .spawn(
859                "true",
860                "session".to_string(),
861                dir.path().to_path_buf(),
862                HashMap::new(),
863                Some(Duration::from_secs(30)),
864                dir.path().to_path_buf(),
865                10,
866            )
867            .unwrap();
868        registry
869            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
870            .unwrap();
871
872        registry.cleanup_finished(Duration::ZERO);
873
874        assert!(registry.inner.tasks.lock().unwrap().is_empty());
875    }
876
877    #[test]
878    fn cleanup_finished_keeps_running_tasks() {
879        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
880        let dir = tempfile::tempdir().unwrap();
881        let task_id = registry
882            .spawn(
883                "sleep 5",
884                "session".to_string(),
885                dir.path().to_path_buf(),
886                HashMap::new(),
887                Some(Duration::from_secs(30)),
888                dir.path().to_path_buf(),
889                10,
890            )
891            .unwrap();
892
893        registry.cleanup_finished(Duration::ZERO);
894
895        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
896        let _ = registry.kill(&task_id, "session");
897    }
898}