Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18// Data types re-exported from kaish-types.
19pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21/// A background job.
22pub struct Job {
23    /// Job ID.
24    pub id: JobId,
25    /// Owning manager's session ID — disambiguates output file paths between
26    /// JobManager instances that share the process (and thus the same job ID
27    /// space, since IDs restart at 1 per manager).
28    session_id: u64,
29    /// Command description.
30    pub command: String,
31    /// Task handle (None if already awaited).
32    handle: Option<JoinHandle<ExecResult>>,
33    /// Channel to receive result (alternative to handle).
34    result_rx: Option<oneshot::Receiver<ExecResult>>,
35    /// Cached result after completion.
36    result: Option<ExecResult>,
37    /// Path to output file (captures stdout/stderr after completion).
38    output_file: Option<PathBuf>,
39    /// Whether to persist completed output to a host temp file. Disabled for
40    /// hermetic / read-only kernels (custom backend, NoLocal) whose output must
41    /// never reach the real filesystem outside the VFS — see
42    /// [`JobManager::set_persist_output_files`]. Stamped from the manager when
43    /// the job is registered. Live output is always available in-memory via the
44    /// VFS streams (`/v/jobs/{id}/stdout`), so suppressing the file loses
45    /// nothing for an in-process consumer.
46    persist_output: bool,
47    /// Live stdout stream (bounded ring buffer).
48    stdout_stream: Option<Arc<BoundedStream>>,
49    /// Live stderr stream (bounded ring buffer).
50    stderr_stream: Option<Arc<BoundedStream>>,
51    /// OS process ID (for stopped jobs).
52    pid: Option<u32>,
53    /// OS process group ID (for stopped jobs).
54    pgid: Option<u32>,
55    /// Whether this job is stopped (SIGTSTP).
56    stopped: bool,
57    /// Cancellation token of the background fork running this job. Cancelling
58    /// it stops the job whether it is an in-process builtin future or wraps
59    /// external children (the cancellation cascade SIGTERM→SIGKILLs their
60    /// process groups). This is how `kill %N` reaches a job that has no OS
61    /// process group of its own (e.g. `sleep &`, a kaish builtin).
62    cancel: Option<tokio_util::sync::CancellationToken>,
63    /// Process groups of external children spawned while running this job.
64    /// Lets `kill -<sig> %N` deliver an arbitrary signal (STOP/CONT/USR1/…)
65    /// straight to the real processes via `killpg`, not just terminate. Empty
66    /// for a pure-builtin job (nothing with a PGID ran).
67    pgids: Vec<u32>,
68}
69
70impl Job {
71    /// Create a new job from a task handle.
72    pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
73        Self {
74            id,
75            session_id,
76            command,
77            handle: Some(handle),
78            result_rx: None,
79            result: None,
80            output_file: None,
81            persist_output: true,
82            stdout_stream: None,
83            stderr_stream: None,
84            pid: None,
85            pgid: None,
86            stopped: false,
87            cancel: None,
88            pgids: Vec::new(),
89        }
90    }
91
92    /// Create a new job from a result channel.
93    pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
94        Self {
95            id,
96            session_id,
97            command,
98            handle: None,
99            result_rx: Some(rx),
100            result: None,
101            output_file: None,
102            persist_output: true,
103            stdout_stream: None,
104            stderr_stream: None,
105            pid: None,
106            pgid: None,
107            stopped: false,
108            cancel: None,
109            pgids: Vec::new(),
110        }
111    }
112
113    /// Create a new job with attached output streams.
114    ///
115    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
116    pub fn with_streams(
117        id: JobId,
118        session_id: u64,
119        command: String,
120        rx: oneshot::Receiver<ExecResult>,
121        stdout: Arc<BoundedStream>,
122        stderr: Arc<BoundedStream>,
123    ) -> Self {
124        Self {
125            id,
126            session_id,
127            command,
128            handle: None,
129            result_rx: Some(rx),
130            result: None,
131            output_file: None,
132            persist_output: true,
133            stdout_stream: Some(stdout),
134            stderr_stream: Some(stderr),
135            pid: None,
136            pgid: None,
137            stopped: false,
138            cancel: None,
139            pgids: Vec::new(),
140        }
141    }
142
143    /// Create a stopped job (from Ctrl-Z on a foreground process).
144    pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
145        Self {
146            id,
147            session_id,
148            command,
149            handle: None,
150            result_rx: None,
151            result: None,
152            output_file: None,
153            persist_output: true,
154            stdout_stream: None,
155            stderr_stream: None,
156            pid: Some(pid),
157            pgid: Some(pgid),
158            stopped: true,
159            cancel: None,
160            pgids: Vec::new(),
161        }
162    }
163
164    /// Get the output file path (if available).
165    pub fn output_file(&self) -> Option<&PathBuf> {
166        self.output_file.as_ref()
167    }
168
169    /// Check if the job has completed.
170    ///
171    /// Stopped jobs are not considered done.
172    pub fn is_done(&mut self) -> bool {
173        if self.stopped {
174            return false;
175        }
176        self.try_poll();
177        self.result.is_some()
178    }
179
180    /// Get the job's status.
181    pub fn status(&mut self) -> JobStatus {
182        if self.stopped {
183            return JobStatus::Stopped;
184        }
185        self.try_poll();
186        match &self.result {
187            Some(r) if r.ok() => JobStatus::Done,
188            Some(_) => JobStatus::Failed,
189            None => JobStatus::Running,
190        }
191    }
192
193    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
194    ///
195    /// Returns:
196    /// - `"running"` if the job is still running
197    /// - `"done:0"` if the job completed successfully
198    /// - `"failed:{code}"` if the job failed with an exit code
199    pub fn status_string(&mut self) -> String {
200        self.try_poll();
201        match &self.result {
202            Some(r) if r.ok() => "done:0".to_string(),
203            Some(r) => format!("failed:{}", r.code),
204            None => "running".to_string(),
205        }
206    }
207
208    /// Get the stdout stream (if attached).
209    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
210        self.stdout_stream.as_ref()
211    }
212
213    /// Get the stderr stream (if attached).
214    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
215        self.stderr_stream.as_ref()
216    }
217
218    /// Wait for the job to complete and return its result.
219    ///
220    /// On completion, the job's output is written to a temp file for later retrieval.
221    pub async fn wait(&mut self) -> ExecResult {
222        if let Some(result) = self.result.take() {
223            self.result = Some(result.clone());
224            return result;
225        }
226
227        let result = if let Some(handle) = self.handle.take() {
228            match handle.await {
229                Ok(r) => r,
230                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
231            }
232        } else if let Some(rx) = self.result_rx.take() {
233            match rx.await {
234                Ok(r) => r,
235                Err(_) => ExecResult::failure(1, "job channel closed"),
236            }
237        } else {
238            // Already waited
239            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
240        };
241
242        // Write output to a host temp file for later retrieval — but only when
243        // persistence is enabled. A hermetic / read-only kernel disables it so
244        // job output never bypasses the VFS onto the real filesystem.
245        if self.persist_output
246            && self.output_file.is_none()
247            && let Some(path) = self.write_output_file(&result) {
248                self.output_file = Some(path);
249            }
250
251        self.result = Some(result.clone());
252        result
253    }
254
255    /// Write job output to a temp file.
256    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
257        // This is a human-readable text log; a binary stdout is noted, not
258        // dumped (lossy-decoding it would corrupt; raw bytes would garble the
259        // log). Only its size is recorded.
260        let is_bytes = result.is_bytes();
261        let text = if is_bytes {
262            std::borrow::Cow::Borrowed("")
263        } else {
264            result.text_out()
265        };
266        if !is_bytes && text.is_empty() && result.err.is_empty() {
267            return None;
268        }
269
270        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
271        if std::fs::create_dir_all(&tmp_dir).is_err() {
272            tracing::warn!("Failed to create job output directory");
273            return None;
274        }
275
276        // Include the OS pid: `session_id` is only unique *within* a process
277        // (it's a process-local atomic that restarts at 0), so two kaish
278        // processes on one host — or two `cargo test` binaries — would
279        // otherwise both write `session_0_job_1.txt` into this shared dir and
280        // clobber each other (a real cross-process collision, and the source
281        // of the `test_cleanup_removes_temp_files` flake). pid + session_id +
282        // job id is unique across processes. Mirrors `output_limit`'s spill
283        // filename convention.
284        let filename = format!(
285            "session_{}_job_{}.{}.txt",
286            self.session_id,
287            self.id.0,
288            std::process::id()
289        );
290        let path = tmp_dir.join(filename);
291
292        let mut content = String::new();
293        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
294        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
295
296        if is_bytes {
297            let n = result.out_bytes().map(|b| b.len()).unwrap_or(0);
298            content.push_str(&format!(
299                "## STDOUT\n[binary output: {n} bytes — omitted from this text log]\n"
300            ));
301        } else if !text.is_empty() {
302            content.push_str("## STDOUT\n");
303            content.push_str(&text);
304            if !text.ends_with('\n') {
305                content.push('\n');
306            }
307        }
308
309        if !result.err.is_empty() {
310            content.push_str("\n## STDERR\n");
311            content.push_str(&result.err);
312            if !result.err.ends_with('\n') {
313                content.push('\n');
314            }
315        }
316
317        match std::fs::write(&path, content) {
318            Ok(()) => Some(path),
319            Err(e) => {
320                tracing::warn!("Failed to write job output file: {}", e);
321                None
322            }
323        }
324    }
325
326    /// Remove any temp files associated with this job.
327    pub fn cleanup_files(&mut self) {
328        if let Some(path) = self.output_file.take() {
329            if let Err(e) = std::fs::remove_file(&path) {
330                // Ignore "not found" — file may not have been written
331                if e.kind() != io::ErrorKind::NotFound {
332                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
333                }
334            }
335        }
336    }
337
338    /// Get the result if completed, without waiting.
339    pub fn try_result(&self) -> Option<&ExecResult> {
340        self.result.as_ref()
341    }
342
343    /// Try to poll the result channel and update status.
344    ///
345    /// This is a non-blocking check that updates `self.result` if the
346    /// job has completed. Returns true if the job is now done.
347    pub fn try_poll(&mut self) -> bool {
348        if self.result.is_some() {
349            return true;
350        }
351
352        // Try to poll the oneshot channel
353        if let Some(rx) = self.result_rx.as_mut() {
354            match rx.try_recv() {
355                Ok(result) => {
356                    self.result = Some(result);
357                    self.result_rx = None;
358                    return true;
359                }
360                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
361                    // Still running
362                    return false;
363                }
364                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
365                    // Channel closed without result - job failed
366                    self.result = Some(ExecResult::failure(1, "job channel closed"));
367                    self.result_rx = None;
368                    return true;
369                }
370            }
371        }
372
373        // Check if handle is finished
374        if let Some(handle) = self.handle.as_mut()
375            && handle.is_finished() {
376                // Take the handle and wait for it (should be instant)
377                let Some(mut handle) = self.handle.take() else {
378                    return false;
379                };
380                // Poll directly with a noop waker — safe because is_finished() was true
381                let waker = std::task::Waker::noop();
382                let mut cx = std::task::Context::from_waker(waker);
383                let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
384                    std::task::Poll::Ready(Ok(r)) => r,
385                    std::task::Poll::Ready(Err(e)) => {
386                        ExecResult::failure(1, format!("job panicked: {}", e))
387                    }
388                    std::task::Poll::Pending => return false, // shouldn't happen
389                };
390                self.result = Some(result);
391                return true;
392            }
393
394        false
395    }
396}
397
398/// Process-wide counter handing each JobManager a distinct session ID. Job IDs
399/// restart at 1 per manager, so the session ID is what keeps output file paths
400/// from colliding between managers sharing a process (concurrent tests, forks).
401/// It is process-LOCAL (restarts at 0 per process), so output filenames also
402/// mix in the OS pid to stay unique across processes — see `write_output_file`.
403static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
404
405/// Remove job output files in `/tmp/kaish/jobs/` that were written by processes
406/// which are no longer running. Run once per process (guarded by a `Once` in
407/// [`JobManager::new`]).
408///
409/// Strategy: filenames follow `session_S_job_J.PID.txt`. We parse the PID
410/// component and skip files whose PID matches the current process (those
411/// belong to live sessions in this very process). For other PIDs we check
412/// `/proc/{pid}` on Linux; on non-Linux platforms we skip the prune entirely
413/// since there is no cheap cross-platform liveness check.
414///
415/// All errors are intentionally ignored — this is opportunistic cleanup only.
416fn prune_orphaned_job_files() {
417    // Only prune on Linux where /proc/{pid} is a reliable liveness check.
418    #[cfg(target_os = "linux")]
419    {
420        let jobs_dir = std::env::temp_dir().join("kaish").join("jobs");
421        let Ok(entries) = std::fs::read_dir(&jobs_dir) else {
422            return; // directory doesn't exist yet — nothing to prune
423        };
424        let current_pid = std::process::id();
425        for entry in entries.flatten() {
426            let name = entry.file_name();
427            let name_str = name.to_string_lossy();
428            // Expected format: session_S_job_J.PID.txt
429            // The PID sits between the last '.' before ".txt" and the preceding '.'.
430            let file_pid: Option<u32> = name_str
431                .strip_suffix(".txt")
432                .and_then(|s| s.rsplit_once('.'))
433                .and_then(|(_, pid_str)| pid_str.parse().ok());
434            let Some(pid) = file_pid else {
435                continue; // not a job output file — skip
436            };
437            if pid == current_pid {
438                continue; // belongs to the current process — leave it alone
439            }
440            // Check if the owning process is still alive via /proc.
441            if std::path::Path::new(&format!("/proc/{}", pid)).exists() {
442                continue; // process is still running — leave it alone
443            }
444            // Process is gone: remove the stale file. Error intentionally ignored.
445            let _ = std::fs::remove_file(entry.path());
446        }
447    }
448}
449
450/// Manager for background jobs.
451pub struct JobManager {
452    /// Process-unique ID for this manager, mixed into job output file paths.
453    session_id: u64,
454    /// Counter for generating unique job IDs.
455    next_id: AtomicU64,
456    /// Map of job ID to job.
457    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
458    /// Whether completed jobs persist their output to a host temp file. On by
459    /// default; a hermetic / read-only kernel disables it so output never
460    /// bypasses the VFS onto the real filesystem (see
461    /// [`set_persist_output_files`](Self::set_persist_output_files)). Stamped
462    /// onto each [`Job`] at registration.
463    persist_output_files: std::sync::atomic::AtomicBool,
464}
465
466impl JobManager {
467    /// Create a new job manager.
468    ///
469    /// On construction, best-effort prunes stale job output files left by
470    /// previously crashed kaish processes. All errors are intentionally ignored
471    /// — startup cleanup is opportunistic and must never prevent the manager
472    /// from being created (silent-fallback rule: the only case where silent is
473    /// correct is read-only / cleanup-only paths with no data loss risk).
474    ///
475    /// # Scoping decision
476    /// All sessions share a single `/tmp/kaish/jobs/` directory. Filenames embed
477    /// the OS PID that wrote them (`session_S_job_J.PID.txt`). Files from the
478    /// current process are never touched here — only files whose embedded PID
479    /// refers to a dead process are removed. On Linux we check `/proc/{pid}` for
480    /// existence; on other platforms we skip the prune rather than guess.
481    pub fn new() -> Self {
482        // Orphans from dead sessions only need pruning once per process, not on
483        // every JobManager (kernels + every fork build one). The `Once` keeps
484        // the dir scan / `/proc` checks off the hot path of background jobs,
485        // scatter workers, and pipeline stages.
486        static PRUNE_ONCE: std::sync::Once = std::sync::Once::new();
487        PRUNE_ONCE.call_once(prune_orphaned_job_files);
488        Self {
489            session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
490            next_id: AtomicU64::new(1),
491            jobs: Arc::new(Mutex::new(HashMap::new())),
492            persist_output_files: std::sync::atomic::AtomicBool::new(true),
493        }
494    }
495
496    /// Toggle whether completed jobs persist their output to a host temp file.
497    ///
498    /// Disable this for a hermetic / read-only kernel: the host write in
499    /// [`Job::write_output_file`] uses `std::fs` directly and so bypasses the
500    /// VFS (and any read-only mount). Live output stays available in-memory via
501    /// the VFS streams (`/v/jobs/{id}/stdout`), so nothing is lost in-process.
502    ///
503    /// Must be set before jobs are spawned — the flag is stamped onto each job
504    /// at registration time, not consulted at completion.
505    pub fn set_persist_output_files(&self, on: bool) {
506        self.persist_output_files.store(on, Ordering::Relaxed);
507    }
508
509    /// Whether completed jobs persist their output to a host temp file.
510    pub fn persist_output_files(&self) -> bool {
511        self.persist_output_files.load(Ordering::Relaxed)
512    }
513
514    /// Spawn a new background job from a future.
515    ///
516    /// The job is inserted into the map synchronously before returning,
517    /// guaranteeing it's immediately queryable via `exists()` or `get()`.
518    pub fn spawn<F>(&self, command: String, future: F) -> JobId
519    where
520        F: std::future::Future<Output = ExecResult> + Send + 'static,
521    {
522        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
523        // Propagate the embedder's trace context across the spawn boundary so
524        // background-job spans stay in the same trace (see telemetry module).
525        let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
526        let mut job = Job::new(id, self.session_id, command, handle);
527        job.persist_output = self.persist_output_files();
528
529        // Spin on try_lock to guarantee the job is in the map on return.
530        // The lock is tokio::sync::Mutex which is only held briefly during
531        // HashMap operations, so contention resolves quickly.
532        let mut job_opt = Some(job);
533        loop {
534            match self.jobs.try_lock() {
535                Ok(mut guard) => {
536                    if let Some(j) = job_opt.take() {
537                        guard.insert(id, j);
538                    }
539                    break;
540                }
541                Err(_) => {
542                    std::hint::spin_loop();
543                }
544            }
545        }
546
547        id
548    }
549
550    /// Spawn a job that's already running and communicate via channel.
551    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
552        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
553        let mut job = Job::from_channel(id, self.session_id, command, rx);
554        job.persist_output = self.persist_output_files();
555
556        let mut jobs = self.jobs.lock().await;
557        jobs.insert(id, job);
558
559        id
560    }
561
562    /// Register a job with attached output streams.
563    ///
564    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
565    pub async fn register_with_streams(
566        &self,
567        command: String,
568        rx: oneshot::Receiver<ExecResult>,
569        stdout: Arc<BoundedStream>,
570        stderr: Arc<BoundedStream>,
571    ) -> JobId {
572        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
573        let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
574        job.persist_output = self.persist_output_files();
575
576        let mut jobs = self.jobs.lock().await;
577        jobs.insert(id, job);
578
579        id
580    }
581
582    /// Wait for a specific job to complete.
583    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
584        let mut jobs = self.jobs.lock().await;
585        if let Some(job) = jobs.get_mut(&id) {
586            Some(job.wait().await)
587        } else {
588            None
589        }
590    }
591
592    /// Wait for all jobs to complete, returning results in completion order.
593    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
594        let mut results = Vec::new();
595
596        // Get all job IDs
597        let ids: Vec<JobId> = {
598            let jobs = self.jobs.lock().await;
599            jobs.keys().copied().collect()
600        };
601
602        for id in ids {
603            if let Some(result) = self.wait(id).await {
604                results.push((id, result));
605            }
606        }
607
608        results
609    }
610
611    /// List all jobs with their status.
612    pub async fn list(&self) -> Vec<JobInfo> {
613        let mut jobs = self.jobs.lock().await;
614        jobs.values_mut()
615            .map(|job| JobInfo {
616                id: job.id,
617                command: job.command.clone(),
618                status: job.status(),
619                output_file: job.output_file.clone(),
620                pid: job.pid,
621            })
622            .collect()
623    }
624
625    /// Get the number of running jobs.
626    pub async fn running_count(&self) -> usize {
627        let mut jobs = self.jobs.lock().await;
628        let mut count = 0;
629        for job in jobs.values_mut() {
630            if !job.is_done() {
631                count += 1;
632            }
633        }
634        count
635    }
636
637    /// Remove completed jobs from tracking and clean up their temp files.
638    pub async fn cleanup(&self) {
639        let mut jobs = self.jobs.lock().await;
640        jobs.retain(|_, job| {
641            if job.is_done() {
642                job.cleanup_files();
643                false
644            } else {
645                true
646            }
647        });
648    }
649
650    /// Check if a specific job exists.
651    pub async fn exists(&self, id: JobId) -> bool {
652        let jobs = self.jobs.lock().await;
653        jobs.contains_key(&id)
654    }
655
656    /// Get info for a specific job.
657    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
658        let mut jobs = self.jobs.lock().await;
659        jobs.get_mut(&id).map(|job| JobInfo {
660            id: job.id,
661            command: job.command.clone(),
662            status: job.status(),
663            output_file: job.output_file.clone(),
664            pid: job.pid,
665        })
666    }
667
668    /// Get the command string for a job.
669    pub async fn get_command(&self, id: JobId) -> Option<String> {
670        let jobs = self.jobs.lock().await;
671        jobs.get(&id).map(|job| job.command.clone())
672    }
673
674    /// Get the status string for a job (for /v/jobs/{id}/status).
675    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
676        let mut jobs = self.jobs.lock().await;
677        jobs.get_mut(&id).map(|job| job.status_string())
678    }
679
680    /// Read stdout stream content for a job.
681    ///
682    /// Returns `None` if the job doesn't exist or has no attached stream.
683    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
684        let jobs = self.jobs.lock().await;
685        if let Some(job) = jobs.get(&id)
686            && let Some(stream) = job.stdout_stream() {
687                return Some(stream.read().await);
688            }
689        None
690    }
691
692    /// Read stderr stream content for a job.
693    ///
694    /// Returns `None` if the job doesn't exist or has no attached stream.
695    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
696        let jobs = self.jobs.lock().await;
697        if let Some(job) = jobs.get(&id)
698            && let Some(stream) = job.stderr_stream() {
699                return Some(stream.read().await);
700            }
701        None
702    }
703
704    /// List all job IDs.
705    pub async fn list_ids(&self) -> Vec<JobId> {
706        let jobs = self.jobs.lock().await;
707        jobs.keys().copied().collect()
708    }
709
710    /// Register a stopped job (from Ctrl-Z on a foreground process).
711    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
712        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
713        let job = Job::stopped(id, self.session_id, command, pid, pgid);
714        let mut jobs = self.jobs.lock().await;
715        jobs.insert(id, job);
716        id
717    }
718
719    /// Mark a job as stopped with its process info.
720    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
721        let mut jobs = self.jobs.lock().await;
722        if let Some(job) = jobs.get_mut(&id) {
723            job.stopped = true;
724            job.pid = Some(pid);
725            job.pgid = Some(pgid);
726        }
727    }
728
729    /// Mark a stopped job as resumed.
730    pub async fn resume_job(&self, id: JobId) {
731        let mut jobs = self.jobs.lock().await;
732        if let Some(job) = jobs.get_mut(&id) {
733            job.stopped = false;
734        }
735    }
736
737    /// Get the most recently stopped job.
738    pub async fn last_stopped(&self) -> Option<JobId> {
739        let mut jobs = self.jobs.lock().await;
740        // Find the highest-numbered stopped job
741        let mut best: Option<JobId> = None;
742        for job in jobs.values_mut() {
743            if job.stopped {
744                match best {
745                    None => best = Some(job.id),
746                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
747                    _ => {}
748                }
749            }
750        }
751        best
752    }
753
754    /// Get process info (pid, pgid) for a job.
755    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
756        let jobs = self.jobs.lock().await;
757        jobs.get(&id).and_then(|job| {
758            match (job.pid, job.pgid) {
759                (Some(pid), Some(pgid)) => Some((pid, pgid)),
760                _ => None,
761            }
762        })
763    }
764
765    /// Record the cancellation token of the fork running a background job, so
766    /// `kill %N` can stop the job even when it has no OS process group of its
767    /// own (e.g. a pure builtin like `sleep &`).
768    pub async fn set_cancel_token(&self, id: JobId, token: tokio_util::sync::CancellationToken) {
769        let mut jobs = self.jobs.lock().await;
770        if let Some(job) = jobs.get_mut(&id) {
771            job.cancel = Some(token);
772        }
773    }
774
775    /// Cancel a job by its token. Returns `true` if a token was recorded and
776    /// cancelled. The cancellation cascade stops in-process builtin futures and
777    /// SIGTERM→SIGKILLs any external children's process groups.
778    pub async fn cancel(&self, id: JobId) -> bool {
779        let jobs = self.jobs.lock().await;
780        match jobs.get(&id).and_then(|job| job.cancel.clone()) {
781            Some(token) => {
782                token.cancel();
783                true
784            }
785            None => false,
786        }
787    }
788
789    /// Record a process group spawned while running a background job. Lets
790    /// `kill -<sig> %N` deliver an arbitrary signal directly to the real
791    /// processes. Deduplicated (a job may spawn several externals).
792    pub async fn add_pgid(&self, id: JobId, pgid: u32) {
793        let mut jobs = self.jobs.lock().await;
794        if let Some(job) = jobs.get_mut(&id) {
795            if !job.pgids.contains(&pgid) {
796                job.pgids.push(pgid);
797            }
798        }
799    }
800
801    /// The process groups recorded for a job (empty for a pure-builtin job).
802    /// Includes the legacy single `pgid` recorded for *stopped* jobs (Ctrl-Z),
803    /// so `kill %N` signals a stopped foreground job's group too.
804    pub async fn job_pgids(&self, id: JobId) -> Vec<u32> {
805        let jobs = self.jobs.lock().await;
806        jobs.get(&id)
807            .map(|job| {
808                let mut v = job.pgids.clone();
809                if let Some(pg) = job.pgid {
810                    if !v.contains(&pg) {
811                        v.push(pg);
812                    }
813                }
814                v
815            })
816            .unwrap_or_default()
817    }
818
819    /// Remove a job from tracking.
820    pub async fn remove(&self, id: JobId) {
821        let mut jobs = self.jobs.lock().await;
822        if let Some(mut job) = jobs.remove(&id) {
823            job.cleanup_files();
824        }
825    }
826}
827
828impl Default for JobManager {
829    fn default() -> Self {
830        Self::new()
831    }
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837    use std::time::Duration;
838
839    #[tokio::test]
840    async fn test_no_host_output_file_when_persistence_disabled() {
841        // A hermetic / read-only kernel (custom backend, or NoLocal mode)
842        // disables host output-file persistence so a background job's output
843        // never lands on the real filesystem via `std::fs`, bypassing the VFS.
844        let manager = JobManager::new();
845        assert!(manager.persist_output_files(), "default is to persist");
846        manager.set_persist_output_files(false);
847        assert!(!manager.persist_output_files());
848
849        let id = manager.spawn("leaky".to_string(), async {
850            ExecResult::success("output that must not hit host disk")
851        });
852        tokio::time::sleep(Duration::from_millis(10)).await;
853        let result = manager.wait(id).await;
854        assert!(result.is_some());
855
856        // No temp file should have been written to the host filesystem.
857        let output_file = {
858            let jobs = manager.jobs.lock().await;
859            jobs.get(&id).and_then(|j| j.output_file().cloned())
860        };
861        assert!(
862            output_file.is_none(),
863            "no host output file should be written when persistence is disabled, got {output_file:?}"
864        );
865    }
866
867    #[tokio::test]
868    async fn test_spawn_and_wait() {
869        let manager = JobManager::new();
870
871        let id = manager.spawn("test".to_string(), async {
872            tokio::time::sleep(Duration::from_millis(10)).await;
873            ExecResult::success("done")
874        });
875
876        // Wait a bit for the job to be registered
877        tokio::time::sleep(Duration::from_millis(5)).await;
878
879        let result = manager.wait(id).await;
880        assert!(result.is_some());
881        let result = result.unwrap();
882        assert!(result.ok());
883        assert_eq!(&*result.text_out(), "done");
884    }
885
886    #[tokio::test]
887    async fn test_wait_all() {
888        let manager = JobManager::new();
889
890        manager.spawn("job1".to_string(), async {
891            tokio::time::sleep(Duration::from_millis(10)).await;
892            ExecResult::success("one")
893        });
894
895        manager.spawn("job2".to_string(), async {
896            tokio::time::sleep(Duration::from_millis(5)).await;
897            ExecResult::success("two")
898        });
899
900        // Wait for jobs to register
901        tokio::time::sleep(Duration::from_millis(5)).await;
902
903        let results = manager.wait_all().await;
904        assert_eq!(results.len(), 2);
905    }
906
907    #[tokio::test]
908    async fn test_list_jobs() {
909        let manager = JobManager::new();
910
911        manager.spawn("test job".to_string(), async {
912            tokio::time::sleep(Duration::from_millis(50)).await;
913            ExecResult::success("")
914        });
915
916        // Wait for job to register
917        tokio::time::sleep(Duration::from_millis(5)).await;
918
919        let jobs = manager.list().await;
920        assert_eq!(jobs.len(), 1);
921        assert_eq!(jobs[0].command, "test job");
922        assert_eq!(jobs[0].status, JobStatus::Running);
923    }
924
925    #[tokio::test]
926    async fn test_job_status_after_completion() {
927        let manager = JobManager::new();
928
929        let id = manager.spawn("quick".to_string(), async {
930            ExecResult::success("")
931        });
932
933        // Wait for job to complete
934        tokio::time::sleep(Duration::from_millis(10)).await;
935        let _ = manager.wait(id).await;
936
937        let info = manager.get(id).await;
938        assert!(info.is_some());
939        assert_eq!(info.unwrap().status, JobStatus::Done);
940    }
941
942    #[tokio::test]
943    async fn test_cleanup() {
944        let manager = JobManager::new();
945
946        let id = manager.spawn("done".to_string(), async {
947            ExecResult::success("")
948        });
949
950        // Wait for completion
951        tokio::time::sleep(Duration::from_millis(10)).await;
952        let _ = manager.wait(id).await;
953
954        // Should have 1 job
955        assert_eq!(manager.list().await.len(), 1);
956
957        // Cleanup
958        manager.cleanup().await;
959
960        // Should have 0 jobs
961        assert_eq!(manager.list().await.len(), 0);
962    }
963
964    #[tokio::test]
965    async fn test_cleanup_removes_temp_files() {
966        // Bug K: cleanup should remove temp files
967        let manager = JobManager::new();
968
969        let id = manager.spawn("output job".to_string(), async {
970            ExecResult::success("some output that gets written to a temp file")
971        });
972
973        // Wait for completion (triggers output file creation)
974        tokio::time::sleep(Duration::from_millis(10)).await;
975        let result = manager.wait(id).await;
976        assert!(result.is_some());
977
978        // Get the output file path before cleanup. The job produced output, so
979        // a temp file must have been written — otherwise this test would pass
980        // vacuously.
981        let output_file = {
982            let jobs = manager.jobs.lock().await;
983            jobs.get(&id).and_then(|j| j.output_file().cloned())
984        };
985        let path = output_file.expect("job with output should have written a temp file");
986        assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
987
988        // Cleanup should remove the job and its files.
989        manager.cleanup().await;
990
991        assert!(
992            !path.exists(),
993            "temp file should be removed after cleanup: {}",
994            path.display()
995        );
996    }
997
998    #[tokio::test]
999    async fn test_register_with_channel() {
1000        let manager = JobManager::new();
1001        let (tx, rx) = oneshot::channel();
1002
1003        let id = manager.register("channel job".to_string(), rx).await;
1004
1005        // Send result
1006        tx.send(ExecResult::success("from channel")).unwrap();
1007
1008        let result = manager.wait(id).await;
1009        assert!(result.is_some());
1010        assert_eq!(&*result.unwrap().text_out(), "from channel");
1011    }
1012
1013    #[tokio::test]
1014    async fn test_spawn_immediately_available() {
1015        // Bug J: job should be queryable immediately after spawn()
1016        let manager = JobManager::new();
1017
1018        let id = manager.spawn("instant".to_string(), async {
1019            tokio::time::sleep(Duration::from_millis(100)).await;
1020            ExecResult::success("done")
1021        });
1022
1023        // Should be immediately visible without any sleep
1024        let exists = manager.exists(id).await;
1025        assert!(exists, "job should be immediately available after spawn()");
1026
1027        let info = manager.get(id).await;
1028        assert!(info.is_some(), "job info should be available immediately");
1029    }
1030
1031    #[tokio::test]
1032    async fn test_nonexistent_job() {
1033        let manager = JobManager::new();
1034        let result = manager.wait(JobId(999)).await;
1035        assert!(result.is_none());
1036    }
1037
1038    #[tokio::test]
1039    async fn test_cancel_token_fires() {
1040        // A recorded cancel token can be tripped by id — this is how `kill %N`
1041        // stops a pure-builtin job that has no OS process group.
1042        let manager = JobManager::new();
1043        let token = tokio_util::sync::CancellationToken::new();
1044        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1045        manager.set_cancel_token(id, token.clone()).await;
1046
1047        assert!(!token.is_cancelled());
1048        assert!(manager.cancel(id).await, "cancel should report success");
1049        assert!(token.is_cancelled(), "the job's token must be tripped");
1050    }
1051
1052    #[tokio::test]
1053    async fn test_cancel_without_token_returns_false() {
1054        let manager = JobManager::new();
1055        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1056        // No token recorded → nothing to cancel.
1057        assert!(!manager.cancel(id).await);
1058        // Unknown id → also false.
1059        assert!(!manager.cancel(JobId(999)).await);
1060    }
1061
1062    #[tokio::test]
1063    async fn test_pgids_recorded_and_deduped() {
1064        let manager = JobManager::new();
1065        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1066        assert!(manager.job_pgids(id).await.is_empty());
1067
1068        manager.add_pgid(id, 4242).await;
1069        manager.add_pgid(id, 4243).await;
1070        manager.add_pgid(id, 4242).await; // duplicate ignored
1071        assert_eq!(manager.job_pgids(id).await, vec![4242, 4243]);
1072
1073        // Unknown id → empty, no panic.
1074        assert!(manager.job_pgids(JobId(999)).await.is_empty());
1075    }
1076}