Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18// Data types re-exported from kaish-types.
19pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21/// A background job.
22pub struct Job {
23    /// Job ID.
24    pub id: JobId,
25    /// Owning manager's session ID — disambiguates output file paths between
26    /// JobManager instances that share the process (and thus the same job ID
27    /// space, since IDs restart at 1 per manager).
28    session_id: u64,
29    /// Command description.
30    pub command: String,
31    /// Task handle (None if already awaited).
32    handle: Option<JoinHandle<ExecResult>>,
33    /// Channel to receive result (alternative to handle).
34    result_rx: Option<oneshot::Receiver<ExecResult>>,
35    /// Cached result after completion.
36    result: Option<ExecResult>,
37    /// Path to output file (captures stdout/stderr after completion).
38    output_file: Option<PathBuf>,
39    /// Whether to persist completed output to a host temp file. Disabled for
40    /// hermetic / read-only kernels (custom backend, NoLocal) whose output must
41    /// never reach the real filesystem outside the VFS — see
42    /// [`JobManager::set_persist_output_files`]. Stamped from the manager when
43    /// the job is registered. Live output is always available in-memory via the
44    /// VFS streams (`/v/jobs/{id}/stdout`), so suppressing the file loses
45    /// nothing for an in-process consumer.
46    persist_output: bool,
47    /// Live stdout stream (bounded ring buffer).
48    stdout_stream: Option<Arc<BoundedStream>>,
49    /// Live stderr stream (bounded ring buffer).
50    stderr_stream: Option<Arc<BoundedStream>>,
51    /// OS process ID (for stopped jobs).
52    pid: Option<u32>,
53    /// OS process group ID (for stopped jobs).
54    pgid: Option<u32>,
55    /// Whether this job is stopped (SIGTSTP).
56    stopped: bool,
57    /// Cancellation token of the background fork running this job. Cancelling
58    /// it stops the job whether it is an in-process builtin future or wraps
59    /// external children (the cancellation cascade SIGTERM→SIGKILLs their
60    /// process groups). This is how `kill %N` reaches a job that has no OS
61    /// process group of its own (e.g. `sleep &`, a kaish builtin).
62    cancel: Option<tokio_util::sync::CancellationToken>,
63    /// Process groups of external children spawned while running this job.
64    /// Lets `kill -<sig> %N` deliver an arbitrary signal (STOP/CONT/USR1/…)
65    /// straight to the real processes via `killpg`, not just terminate. Empty
66    /// for a pure-builtin job (nothing with a PGID ran).
67    pgids: Vec<u32>,
68}
69
70impl Job {
71    /// Create a new job from a task handle.
72    pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
73        Self {
74            id,
75            session_id,
76            command,
77            handle: Some(handle),
78            result_rx: None,
79            result: None,
80            output_file: None,
81            persist_output: true,
82            stdout_stream: None,
83            stderr_stream: None,
84            pid: None,
85            pgid: None,
86            stopped: false,
87            cancel: None,
88            pgids: Vec::new(),
89        }
90    }
91
92    /// Create a new job from a result channel.
93    pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
94        Self {
95            id,
96            session_id,
97            command,
98            handle: None,
99            result_rx: Some(rx),
100            result: None,
101            output_file: None,
102            persist_output: true,
103            stdout_stream: None,
104            stderr_stream: None,
105            pid: None,
106            pgid: None,
107            stopped: false,
108            cancel: None,
109            pgids: Vec::new(),
110        }
111    }
112
113    /// Create a new job with attached output streams.
114    ///
115    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
116    pub fn with_streams(
117        id: JobId,
118        session_id: u64,
119        command: String,
120        rx: oneshot::Receiver<ExecResult>,
121        stdout: Arc<BoundedStream>,
122        stderr: Arc<BoundedStream>,
123    ) -> Self {
124        Self {
125            id,
126            session_id,
127            command,
128            handle: None,
129            result_rx: Some(rx),
130            result: None,
131            output_file: None,
132            persist_output: true,
133            stdout_stream: Some(stdout),
134            stderr_stream: Some(stderr),
135            pid: None,
136            pgid: None,
137            stopped: false,
138            cancel: None,
139            pgids: Vec::new(),
140        }
141    }
142
143    /// Create a stopped job (from Ctrl-Z on a foreground process).
144    pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
145        Self {
146            id,
147            session_id,
148            command,
149            handle: None,
150            result_rx: None,
151            result: None,
152            output_file: None,
153            persist_output: true,
154            stdout_stream: None,
155            stderr_stream: None,
156            pid: Some(pid),
157            pgid: Some(pgid),
158            stopped: true,
159            cancel: None,
160            pgids: Vec::new(),
161        }
162    }
163
164    /// Get the output file path (if available).
165    pub fn output_file(&self) -> Option<&PathBuf> {
166        self.output_file.as_ref()
167    }
168
169    /// Check if the job has completed.
170    ///
171    /// Stopped jobs are not considered done.
172    pub fn is_done(&mut self) -> bool {
173        if self.stopped {
174            return false;
175        }
176        self.try_poll();
177        self.result.is_some()
178    }
179
180    /// Get the job's status.
181    pub fn status(&mut self) -> JobStatus {
182        if self.stopped {
183            return JobStatus::Stopped;
184        }
185        self.try_poll();
186        match &self.result {
187            Some(r) if r.ok() => JobStatus::Done,
188            Some(_) => JobStatus::Failed,
189            None => JobStatus::Running,
190        }
191    }
192
193    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
194    ///
195    /// Returns:
196    /// - `"running"` if the job is still running
197    /// - `"done:0"` if the job completed successfully
198    /// - `"failed:{code}"` if the job failed with an exit code
199    pub fn status_string(&mut self) -> String {
200        self.try_poll();
201        match &self.result {
202            Some(r) if r.ok() => "done:0".to_string(),
203            Some(r) => format!("failed:{}", r.code),
204            None => "running".to_string(),
205        }
206    }
207
208    /// Get the stdout stream (if attached).
209    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
210        self.stdout_stream.as_ref()
211    }
212
213    /// Get the stderr stream (if attached).
214    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
215        self.stderr_stream.as_ref()
216    }
217
218    /// Wait for the job to complete and return its result.
219    ///
220    /// On completion, the job's output is written to a temp file for later retrieval.
221    pub async fn wait(&mut self) -> ExecResult {
222        if let Some(result) = self.result.take() {
223            self.result = Some(result.clone());
224            return result;
225        }
226
227        let result = if let Some(handle) = self.handle.take() {
228            match handle.await {
229                Ok(r) => r,
230                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
231            }
232        } else if let Some(rx) = self.result_rx.take() {
233            match rx.await {
234                Ok(r) => r,
235                Err(_) => ExecResult::failure(1, "job channel closed"),
236            }
237        } else {
238            // Already waited
239            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
240        };
241
242        // Write output to a host temp file for later retrieval — but only when
243        // persistence is enabled. A hermetic / read-only kernel disables it so
244        // job output never bypasses the VFS onto the real filesystem.
245        if self.persist_output
246            && self.output_file.is_none()
247            && let Some(path) = self.write_output_file(&result) {
248                self.output_file = Some(path);
249            }
250
251        self.result = Some(result.clone());
252        result
253    }
254
255    /// Write job output to a temp file.
256    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
257        // This is a human-readable text log; a binary stdout is noted, not
258        // dumped (lossy-decoding it would corrupt; raw bytes would garble the
259        // log). Only its size is recorded.
260        let is_bytes = result.is_bytes();
261        let text = if is_bytes {
262            std::borrow::Cow::Borrowed("")
263        } else {
264            result.text_out()
265        };
266        if !is_bytes && text.is_empty() && result.err.is_empty() {
267            return None;
268        }
269
270        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
271        if std::fs::create_dir_all(&tmp_dir).is_err() {
272            tracing::warn!("Failed to create job output directory");
273            return None;
274        }
275
276        // Include the OS pid: `session_id` is only unique *within* a process
277        // (it's a process-local atomic that restarts at 0), so two kaish
278        // processes on one host — or two `cargo test` binaries — would
279        // otherwise both write `session_0_job_1.txt` into this shared dir and
280        // clobber each other (a real cross-process collision, and the source
281        // of the `test_cleanup_removes_temp_files` flake). pid + session_id +
282        // job id is unique across processes. Mirrors `output_limit`'s spill
283        // filename convention.
284        let filename = format!(
285            "session_{}_job_{}.{}.txt",
286            self.session_id,
287            self.id.0,
288            std::process::id()
289        );
290        let path = tmp_dir.join(filename);
291
292        let mut content = String::new();
293        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
294        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
295
296        if is_bytes {
297            let n = result.out_bytes().map(|b| b.len()).unwrap_or(0);
298            content.push_str(&format!(
299                "## STDOUT\n[binary output: {n} bytes — omitted from this text log]\n"
300            ));
301        } else if !text.is_empty() {
302            content.push_str("## STDOUT\n");
303            content.push_str(&text);
304            if !text.ends_with('\n') {
305                content.push('\n');
306            }
307        }
308
309        if !result.err.is_empty() {
310            content.push_str("\n## STDERR\n");
311            content.push_str(&result.err);
312            if !result.err.ends_with('\n') {
313                content.push('\n');
314            }
315        }
316
317        match std::fs::write(&path, content) {
318            Ok(()) => Some(path),
319            Err(e) => {
320                tracing::warn!("Failed to write job output file: {}", e);
321                None
322            }
323        }
324    }
325
326    /// Remove any temp files associated with this job.
327    pub fn cleanup_files(&mut self) {
328        if let Some(path) = self.output_file.take() {
329            if let Err(e) = std::fs::remove_file(&path) {
330                // Ignore "not found" — file may not have been written
331                if e.kind() != io::ErrorKind::NotFound {
332                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
333                }
334            }
335        }
336    }
337
338    /// Get the result if completed, without waiting.
339    pub fn try_result(&self) -> Option<&ExecResult> {
340        self.result.as_ref()
341    }
342
343    /// Try to poll the result channel and update status.
344    ///
345    /// This is a non-blocking check that updates `self.result` if the
346    /// job has completed. Returns true if the job is now done.
347    pub fn try_poll(&mut self) -> bool {
348        if self.result.is_some() {
349            return true;
350        }
351
352        // Try to poll the oneshot channel
353        if let Some(rx) = self.result_rx.as_mut() {
354            match rx.try_recv() {
355                Ok(result) => {
356                    self.result = Some(result);
357                    self.result_rx = None;
358                    return true;
359                }
360                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
361                    // Still running
362                    return false;
363                }
364                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
365                    // Channel closed without result - job failed
366                    self.result = Some(ExecResult::failure(1, "job channel closed"));
367                    self.result_rx = None;
368                    return true;
369                }
370            }
371        }
372
373        // Check if handle is finished
374        if let Some(handle) = self.handle.as_mut()
375            && handle.is_finished() {
376                // Take the handle and wait for it (should be instant)
377                let Some(mut handle) = self.handle.take() else {
378                    return false;
379                };
380                // Poll directly with a noop waker — safe because is_finished() was true
381                let waker = std::task::Waker::noop();
382                let mut cx = std::task::Context::from_waker(waker);
383                let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
384                    std::task::Poll::Ready(Ok(r)) => r,
385                    std::task::Poll::Ready(Err(e)) => {
386                        ExecResult::failure(1, format!("job panicked: {}", e))
387                    }
388                    std::task::Poll::Pending => return false, // shouldn't happen
389                };
390                self.result = Some(result);
391                return true;
392            }
393
394        false
395    }
396}
397
398/// Process-wide counter handing each JobManager a distinct session ID. Job IDs
399/// restart at 1 per manager, so the session ID is what keeps output file paths
400/// from colliding between managers sharing a process (concurrent tests, forks).
401/// It is process-LOCAL (restarts at 0 per process), so output filenames also
402/// mix in the OS pid to stay unique across processes — see `write_output_file`.
403static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
404
405/// Manager for background jobs.
406pub struct JobManager {
407    /// Process-unique ID for this manager, mixed into job output file paths.
408    session_id: u64,
409    /// Counter for generating unique job IDs.
410    next_id: AtomicU64,
411    /// Map of job ID to job.
412    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
413    /// Whether completed jobs persist their output to a host temp file. On by
414    /// default; a hermetic / read-only kernel disables it so output never
415    /// bypasses the VFS onto the real filesystem (see
416    /// [`set_persist_output_files`](Self::set_persist_output_files)). Stamped
417    /// onto each [`Job`] at registration.
418    persist_output_files: std::sync::atomic::AtomicBool,
419}
420
421impl JobManager {
422    /// Create a new job manager.
423    pub fn new() -> Self {
424        Self {
425            session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
426            next_id: AtomicU64::new(1),
427            jobs: Arc::new(Mutex::new(HashMap::new())),
428            persist_output_files: std::sync::atomic::AtomicBool::new(true),
429        }
430    }
431
432    /// Toggle whether completed jobs persist their output to a host temp file.
433    ///
434    /// Disable this for a hermetic / read-only kernel: the host write in
435    /// [`Job::write_output_file`] uses `std::fs` directly and so bypasses the
436    /// VFS (and any read-only mount). Live output stays available in-memory via
437    /// the VFS streams (`/v/jobs/{id}/stdout`), so nothing is lost in-process.
438    ///
439    /// Must be set before jobs are spawned — the flag is stamped onto each job
440    /// at registration time, not consulted at completion.
441    pub fn set_persist_output_files(&self, on: bool) {
442        self.persist_output_files.store(on, Ordering::Relaxed);
443    }
444
445    /// Whether completed jobs persist their output to a host temp file.
446    pub fn persist_output_files(&self) -> bool {
447        self.persist_output_files.load(Ordering::Relaxed)
448    }
449
450    /// Spawn a new background job from a future.
451    ///
452    /// The job is inserted into the map synchronously before returning,
453    /// guaranteeing it's immediately queryable via `exists()` or `get()`.
454    pub fn spawn<F>(&self, command: String, future: F) -> JobId
455    where
456        F: std::future::Future<Output = ExecResult> + Send + 'static,
457    {
458        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
459        // Propagate the embedder's trace context across the spawn boundary so
460        // background-job spans stay in the same trace (see telemetry module).
461        let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
462        let mut job = Job::new(id, self.session_id, command, handle);
463        job.persist_output = self.persist_output_files();
464
465        // Spin on try_lock to guarantee the job is in the map on return.
466        // The lock is tokio::sync::Mutex which is only held briefly during
467        // HashMap operations, so contention resolves quickly.
468        let mut job_opt = Some(job);
469        loop {
470            match self.jobs.try_lock() {
471                Ok(mut guard) => {
472                    if let Some(j) = job_opt.take() {
473                        guard.insert(id, j);
474                    }
475                    break;
476                }
477                Err(_) => {
478                    std::hint::spin_loop();
479                }
480            }
481        }
482
483        id
484    }
485
486    /// Spawn a job that's already running and communicate via channel.
487    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
488        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
489        let mut job = Job::from_channel(id, self.session_id, command, rx);
490        job.persist_output = self.persist_output_files();
491
492        let mut jobs = self.jobs.lock().await;
493        jobs.insert(id, job);
494
495        id
496    }
497
498    /// Register a job with attached output streams.
499    ///
500    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
501    pub async fn register_with_streams(
502        &self,
503        command: String,
504        rx: oneshot::Receiver<ExecResult>,
505        stdout: Arc<BoundedStream>,
506        stderr: Arc<BoundedStream>,
507    ) -> JobId {
508        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
509        let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
510        job.persist_output = self.persist_output_files();
511
512        let mut jobs = self.jobs.lock().await;
513        jobs.insert(id, job);
514
515        id
516    }
517
518    /// Wait for a specific job to complete.
519    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
520        let mut jobs = self.jobs.lock().await;
521        if let Some(job) = jobs.get_mut(&id) {
522            Some(job.wait().await)
523        } else {
524            None
525        }
526    }
527
528    /// Wait for all jobs to complete, returning results in completion order.
529    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
530        let mut results = Vec::new();
531
532        // Get all job IDs
533        let ids: Vec<JobId> = {
534            let jobs = self.jobs.lock().await;
535            jobs.keys().copied().collect()
536        };
537
538        for id in ids {
539            if let Some(result) = self.wait(id).await {
540                results.push((id, result));
541            }
542        }
543
544        results
545    }
546
547    /// List all jobs with their status.
548    pub async fn list(&self) -> Vec<JobInfo> {
549        let mut jobs = self.jobs.lock().await;
550        jobs.values_mut()
551            .map(|job| JobInfo {
552                id: job.id,
553                command: job.command.clone(),
554                status: job.status(),
555                output_file: job.output_file.clone(),
556                pid: job.pid,
557            })
558            .collect()
559    }
560
561    /// Get the number of running jobs.
562    pub async fn running_count(&self) -> usize {
563        let mut jobs = self.jobs.lock().await;
564        let mut count = 0;
565        for job in jobs.values_mut() {
566            if !job.is_done() {
567                count += 1;
568            }
569        }
570        count
571    }
572
573    /// Remove completed jobs from tracking and clean up their temp files.
574    pub async fn cleanup(&self) {
575        let mut jobs = self.jobs.lock().await;
576        jobs.retain(|_, job| {
577            if job.is_done() {
578                job.cleanup_files();
579                false
580            } else {
581                true
582            }
583        });
584    }
585
586    /// Check if a specific job exists.
587    pub async fn exists(&self, id: JobId) -> bool {
588        let jobs = self.jobs.lock().await;
589        jobs.contains_key(&id)
590    }
591
592    /// Get info for a specific job.
593    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
594        let mut jobs = self.jobs.lock().await;
595        jobs.get_mut(&id).map(|job| JobInfo {
596            id: job.id,
597            command: job.command.clone(),
598            status: job.status(),
599            output_file: job.output_file.clone(),
600            pid: job.pid,
601        })
602    }
603
604    /// Get the command string for a job.
605    pub async fn get_command(&self, id: JobId) -> Option<String> {
606        let jobs = self.jobs.lock().await;
607        jobs.get(&id).map(|job| job.command.clone())
608    }
609
610    /// Get the status string for a job (for /v/jobs/{id}/status).
611    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
612        let mut jobs = self.jobs.lock().await;
613        jobs.get_mut(&id).map(|job| job.status_string())
614    }
615
616    /// Read stdout stream content for a job.
617    ///
618    /// Returns `None` if the job doesn't exist or has no attached stream.
619    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
620        let jobs = self.jobs.lock().await;
621        if let Some(job) = jobs.get(&id)
622            && let Some(stream) = job.stdout_stream() {
623                return Some(stream.read().await);
624            }
625        None
626    }
627
628    /// Read stderr stream content for a job.
629    ///
630    /// Returns `None` if the job doesn't exist or has no attached stream.
631    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
632        let jobs = self.jobs.lock().await;
633        if let Some(job) = jobs.get(&id)
634            && let Some(stream) = job.stderr_stream() {
635                return Some(stream.read().await);
636            }
637        None
638    }
639
640    /// List all job IDs.
641    pub async fn list_ids(&self) -> Vec<JobId> {
642        let jobs = self.jobs.lock().await;
643        jobs.keys().copied().collect()
644    }
645
646    /// Register a stopped job (from Ctrl-Z on a foreground process).
647    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
648        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
649        let job = Job::stopped(id, self.session_id, command, pid, pgid);
650        let mut jobs = self.jobs.lock().await;
651        jobs.insert(id, job);
652        id
653    }
654
655    /// Mark a job as stopped with its process info.
656    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
657        let mut jobs = self.jobs.lock().await;
658        if let Some(job) = jobs.get_mut(&id) {
659            job.stopped = true;
660            job.pid = Some(pid);
661            job.pgid = Some(pgid);
662        }
663    }
664
665    /// Mark a stopped job as resumed.
666    pub async fn resume_job(&self, id: JobId) {
667        let mut jobs = self.jobs.lock().await;
668        if let Some(job) = jobs.get_mut(&id) {
669            job.stopped = false;
670        }
671    }
672
673    /// Get the most recently stopped job.
674    pub async fn last_stopped(&self) -> Option<JobId> {
675        let mut jobs = self.jobs.lock().await;
676        // Find the highest-numbered stopped job
677        let mut best: Option<JobId> = None;
678        for job in jobs.values_mut() {
679            if job.stopped {
680                match best {
681                    None => best = Some(job.id),
682                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
683                    _ => {}
684                }
685            }
686        }
687        best
688    }
689
690    /// Get process info (pid, pgid) for a job.
691    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
692        let jobs = self.jobs.lock().await;
693        jobs.get(&id).and_then(|job| {
694            match (job.pid, job.pgid) {
695                (Some(pid), Some(pgid)) => Some((pid, pgid)),
696                _ => None,
697            }
698        })
699    }
700
701    /// Record the cancellation token of the fork running a background job, so
702    /// `kill %N` can stop the job even when it has no OS process group of its
703    /// own (e.g. a pure builtin like `sleep &`).
704    pub async fn set_cancel_token(&self, id: JobId, token: tokio_util::sync::CancellationToken) {
705        let mut jobs = self.jobs.lock().await;
706        if let Some(job) = jobs.get_mut(&id) {
707            job.cancel = Some(token);
708        }
709    }
710
711    /// Cancel a job by its token. Returns `true` if a token was recorded and
712    /// cancelled. The cancellation cascade stops in-process builtin futures and
713    /// SIGTERM→SIGKILLs any external children's process groups.
714    pub async fn cancel(&self, id: JobId) -> bool {
715        let jobs = self.jobs.lock().await;
716        match jobs.get(&id).and_then(|job| job.cancel.clone()) {
717            Some(token) => {
718                token.cancel();
719                true
720            }
721            None => false,
722        }
723    }
724
725    /// Record a process group spawned while running a background job. Lets
726    /// `kill -<sig> %N` deliver an arbitrary signal directly to the real
727    /// processes. Deduplicated (a job may spawn several externals).
728    pub async fn add_pgid(&self, id: JobId, pgid: u32) {
729        let mut jobs = self.jobs.lock().await;
730        if let Some(job) = jobs.get_mut(&id) {
731            if !job.pgids.contains(&pgid) {
732                job.pgids.push(pgid);
733            }
734        }
735    }
736
737    /// The process groups recorded for a job (empty for a pure-builtin job).
738    /// Includes the legacy single `pgid` recorded for *stopped* jobs (Ctrl-Z),
739    /// so `kill %N` signals a stopped foreground job's group too.
740    pub async fn job_pgids(&self, id: JobId) -> Vec<u32> {
741        let jobs = self.jobs.lock().await;
742        jobs.get(&id)
743            .map(|job| {
744                let mut v = job.pgids.clone();
745                if let Some(pg) = job.pgid {
746                    if !v.contains(&pg) {
747                        v.push(pg);
748                    }
749                }
750                v
751            })
752            .unwrap_or_default()
753    }
754
755    /// Remove a job from tracking.
756    pub async fn remove(&self, id: JobId) {
757        let mut jobs = self.jobs.lock().await;
758        if let Some(mut job) = jobs.remove(&id) {
759            job.cleanup_files();
760        }
761    }
762}
763
764impl Default for JobManager {
765    fn default() -> Self {
766        Self::new()
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use std::time::Duration;
774
775    #[tokio::test]
776    async fn test_no_host_output_file_when_persistence_disabled() {
777        // A hermetic / read-only kernel (custom backend, or NoLocal mode)
778        // disables host output-file persistence so a background job's output
779        // never lands on the real filesystem via `std::fs`, bypassing the VFS.
780        let manager = JobManager::new();
781        assert!(manager.persist_output_files(), "default is to persist");
782        manager.set_persist_output_files(false);
783        assert!(!manager.persist_output_files());
784
785        let id = manager.spawn("leaky".to_string(), async {
786            ExecResult::success("output that must not hit host disk")
787        });
788        tokio::time::sleep(Duration::from_millis(10)).await;
789        let result = manager.wait(id).await;
790        assert!(result.is_some());
791
792        // No temp file should have been written to the host filesystem.
793        let output_file = {
794            let jobs = manager.jobs.lock().await;
795            jobs.get(&id).and_then(|j| j.output_file().cloned())
796        };
797        assert!(
798            output_file.is_none(),
799            "no host output file should be written when persistence is disabled, got {output_file:?}"
800        );
801    }
802
803    #[tokio::test]
804    async fn test_spawn_and_wait() {
805        let manager = JobManager::new();
806
807        let id = manager.spawn("test".to_string(), async {
808            tokio::time::sleep(Duration::from_millis(10)).await;
809            ExecResult::success("done")
810        });
811
812        // Wait a bit for the job to be registered
813        tokio::time::sleep(Duration::from_millis(5)).await;
814
815        let result = manager.wait(id).await;
816        assert!(result.is_some());
817        let result = result.unwrap();
818        assert!(result.ok());
819        assert_eq!(&*result.text_out(), "done");
820    }
821
822    #[tokio::test]
823    async fn test_wait_all() {
824        let manager = JobManager::new();
825
826        manager.spawn("job1".to_string(), async {
827            tokio::time::sleep(Duration::from_millis(10)).await;
828            ExecResult::success("one")
829        });
830
831        manager.spawn("job2".to_string(), async {
832            tokio::time::sleep(Duration::from_millis(5)).await;
833            ExecResult::success("two")
834        });
835
836        // Wait for jobs to register
837        tokio::time::sleep(Duration::from_millis(5)).await;
838
839        let results = manager.wait_all().await;
840        assert_eq!(results.len(), 2);
841    }
842
843    #[tokio::test]
844    async fn test_list_jobs() {
845        let manager = JobManager::new();
846
847        manager.spawn("test job".to_string(), async {
848            tokio::time::sleep(Duration::from_millis(50)).await;
849            ExecResult::success("")
850        });
851
852        // Wait for job to register
853        tokio::time::sleep(Duration::from_millis(5)).await;
854
855        let jobs = manager.list().await;
856        assert_eq!(jobs.len(), 1);
857        assert_eq!(jobs[0].command, "test job");
858        assert_eq!(jobs[0].status, JobStatus::Running);
859    }
860
861    #[tokio::test]
862    async fn test_job_status_after_completion() {
863        let manager = JobManager::new();
864
865        let id = manager.spawn("quick".to_string(), async {
866            ExecResult::success("")
867        });
868
869        // Wait for job to complete
870        tokio::time::sleep(Duration::from_millis(10)).await;
871        let _ = manager.wait(id).await;
872
873        let info = manager.get(id).await;
874        assert!(info.is_some());
875        assert_eq!(info.unwrap().status, JobStatus::Done);
876    }
877
878    #[tokio::test]
879    async fn test_cleanup() {
880        let manager = JobManager::new();
881
882        let id = manager.spawn("done".to_string(), async {
883            ExecResult::success("")
884        });
885
886        // Wait for completion
887        tokio::time::sleep(Duration::from_millis(10)).await;
888        let _ = manager.wait(id).await;
889
890        // Should have 1 job
891        assert_eq!(manager.list().await.len(), 1);
892
893        // Cleanup
894        manager.cleanup().await;
895
896        // Should have 0 jobs
897        assert_eq!(manager.list().await.len(), 0);
898    }
899
900    #[tokio::test]
901    async fn test_cleanup_removes_temp_files() {
902        // Bug K: cleanup should remove temp files
903        let manager = JobManager::new();
904
905        let id = manager.spawn("output job".to_string(), async {
906            ExecResult::success("some output that gets written to a temp file")
907        });
908
909        // Wait for completion (triggers output file creation)
910        tokio::time::sleep(Duration::from_millis(10)).await;
911        let result = manager.wait(id).await;
912        assert!(result.is_some());
913
914        // Get the output file path before cleanup. The job produced output, so
915        // a temp file must have been written — otherwise this test would pass
916        // vacuously.
917        let output_file = {
918            let jobs = manager.jobs.lock().await;
919            jobs.get(&id).and_then(|j| j.output_file().cloned())
920        };
921        let path = output_file.expect("job with output should have written a temp file");
922        assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
923
924        // Cleanup should remove the job and its files.
925        manager.cleanup().await;
926
927        assert!(
928            !path.exists(),
929            "temp file should be removed after cleanup: {}",
930            path.display()
931        );
932    }
933
934    #[tokio::test]
935    async fn test_register_with_channel() {
936        let manager = JobManager::new();
937        let (tx, rx) = oneshot::channel();
938
939        let id = manager.register("channel job".to_string(), rx).await;
940
941        // Send result
942        tx.send(ExecResult::success("from channel")).unwrap();
943
944        let result = manager.wait(id).await;
945        assert!(result.is_some());
946        assert_eq!(&*result.unwrap().text_out(), "from channel");
947    }
948
949    #[tokio::test]
950    async fn test_spawn_immediately_available() {
951        // Bug J: job should be queryable immediately after spawn()
952        let manager = JobManager::new();
953
954        let id = manager.spawn("instant".to_string(), async {
955            tokio::time::sleep(Duration::from_millis(100)).await;
956            ExecResult::success("done")
957        });
958
959        // Should be immediately visible without any sleep
960        let exists = manager.exists(id).await;
961        assert!(exists, "job should be immediately available after spawn()");
962
963        let info = manager.get(id).await;
964        assert!(info.is_some(), "job info should be available immediately");
965    }
966
967    #[tokio::test]
968    async fn test_nonexistent_job() {
969        let manager = JobManager::new();
970        let result = manager.wait(JobId(999)).await;
971        assert!(result.is_none());
972    }
973
974    #[tokio::test]
975    async fn test_cancel_token_fires() {
976        // A recorded cancel token can be tripped by id — this is how `kill %N`
977        // stops a pure-builtin job that has no OS process group.
978        let manager = JobManager::new();
979        let token = tokio_util::sync::CancellationToken::new();
980        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
981        manager.set_cancel_token(id, token.clone()).await;
982
983        assert!(!token.is_cancelled());
984        assert!(manager.cancel(id).await, "cancel should report success");
985        assert!(token.is_cancelled(), "the job's token must be tripped");
986    }
987
988    #[tokio::test]
989    async fn test_cancel_without_token_returns_false() {
990        let manager = JobManager::new();
991        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
992        // No token recorded → nothing to cancel.
993        assert!(!manager.cancel(id).await);
994        // Unknown id → also false.
995        assert!(!manager.cancel(JobId(999)).await);
996    }
997
998    #[tokio::test]
999    async fn test_pgids_recorded_and_deduped() {
1000        let manager = JobManager::new();
1001        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1002        assert!(manager.job_pgids(id).await.is_empty());
1003
1004        manager.add_pgid(id, 4242).await;
1005        manager.add_pgid(id, 4243).await;
1006        manager.add_pgid(id, 4242).await; // duplicate ignored
1007        assert_eq!(manager.job_pgids(id).await, vec![4242, 4243]);
1008
1009        // Unknown id → empty, no panic.
1010        assert!(manager.job_pgids(JobId(999)).await.is_empty());
1011    }
1012}