Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18// Data types re-exported from kaish-types.
19pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21/// A background job.
22pub struct Job {
23    /// Job ID.
24    pub id: JobId,
25    /// Owning manager's session ID — disambiguates output file paths between
26    /// JobManager instances that share the process (and thus the same job ID
27    /// space, since IDs restart at 1 per manager).
28    session_id: u64,
29    /// Command description.
30    pub command: String,
31    /// Task handle (None if already awaited).
32    handle: Option<JoinHandle<ExecResult>>,
33    /// Channel to receive result (alternative to handle).
34    result_rx: Option<oneshot::Receiver<ExecResult>>,
35    /// Cached result after completion.
36    result: Option<ExecResult>,
37    /// Path to output file (captures stdout/stderr after completion).
38    output_file: Option<PathBuf>,
39    /// Whether to persist completed output to a host temp file. Disabled for
40    /// hermetic / read-only kernels (custom backend, NoLocal) whose output must
41    /// never reach the real filesystem outside the VFS — see
42    /// [`JobManager::set_persist_output_files`]. Stamped from the manager when
43    /// the job is registered. Live output is always available in-memory via the
44    /// VFS streams (`/v/jobs/{id}/stdout`), so suppressing the file loses
45    /// nothing for an in-process consumer.
46    persist_output: bool,
47    /// Live stdout stream (bounded ring buffer).
48    stdout_stream: Option<Arc<BoundedStream>>,
49    /// Live stderr stream (bounded ring buffer).
50    stderr_stream: Option<Arc<BoundedStream>>,
51    /// OS process ID (for stopped jobs).
52    pid: Option<u32>,
53    /// OS process group ID (for stopped jobs).
54    pgid: Option<u32>,
55    /// Whether this job is stopped (SIGTSTP).
56    stopped: bool,
57    /// Cancellation token of the background fork running this job. Cancelling
58    /// it stops the job whether it is an in-process builtin future or wraps
59    /// external children (the cancellation cascade SIGTERM→SIGKILLs their
60    /// process groups). This is how `kill %N` reaches a job that has no OS
61    /// process group of its own (e.g. `sleep &`, a kaish builtin).
62    cancel: Option<tokio_util::sync::CancellationToken>,
63    /// Process groups of external children spawned while running this job.
64    /// Lets `kill -<sig> %N` deliver an arbitrary signal (STOP/CONT/USR1/…)
65    /// straight to the real processes via `killpg`, not just terminate. Empty
66    /// for a pure-builtin job (nothing with a PGID ran).
67    pgids: Vec<u32>,
68}
69
70impl Job {
71    /// Create a new job from a task handle.
72    pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
73        Self {
74            id,
75            session_id,
76            command,
77            handle: Some(handle),
78            result_rx: None,
79            result: None,
80            output_file: None,
81            persist_output: true,
82            stdout_stream: None,
83            stderr_stream: None,
84            pid: None,
85            pgid: None,
86            stopped: false,
87            cancel: None,
88            pgids: Vec::new(),
89        }
90    }
91
92    /// Create a new job from a result channel.
93    pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
94        Self {
95            id,
96            session_id,
97            command,
98            handle: None,
99            result_rx: Some(rx),
100            result: None,
101            output_file: None,
102            persist_output: true,
103            stdout_stream: None,
104            stderr_stream: None,
105            pid: None,
106            pgid: None,
107            stopped: false,
108            cancel: None,
109            pgids: Vec::new(),
110        }
111    }
112
113    /// Create a new job with attached output streams.
114    ///
115    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
116    pub fn with_streams(
117        id: JobId,
118        session_id: u64,
119        command: String,
120        rx: oneshot::Receiver<ExecResult>,
121        stdout: Arc<BoundedStream>,
122        stderr: Arc<BoundedStream>,
123    ) -> Self {
124        Self {
125            id,
126            session_id,
127            command,
128            handle: None,
129            result_rx: Some(rx),
130            result: None,
131            output_file: None,
132            persist_output: true,
133            stdout_stream: Some(stdout),
134            stderr_stream: Some(stderr),
135            pid: None,
136            pgid: None,
137            stopped: false,
138            cancel: None,
139            pgids: Vec::new(),
140        }
141    }
142
143    /// Create a stopped job (from Ctrl-Z on a foreground process).
144    pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
145        Self {
146            id,
147            session_id,
148            command,
149            handle: None,
150            result_rx: None,
151            result: None,
152            output_file: None,
153            persist_output: true,
154            stdout_stream: None,
155            stderr_stream: None,
156            pid: Some(pid),
157            pgid: Some(pgid),
158            stopped: true,
159            cancel: None,
160            pgids: Vec::new(),
161        }
162    }
163
164    /// Get the output file path (if available).
165    pub fn output_file(&self) -> Option<&PathBuf> {
166        self.output_file.as_ref()
167    }
168
169    /// Check if the job has completed.
170    ///
171    /// Stopped jobs are not considered done.
172    pub fn is_done(&mut self) -> bool {
173        if self.stopped {
174            return false;
175        }
176        self.try_poll();
177        self.result.is_some()
178    }
179
180    /// Get the job's status.
181    pub fn status(&mut self) -> JobStatus {
182        if self.stopped {
183            return JobStatus::Stopped;
184        }
185        self.try_poll();
186        match &self.result {
187            Some(r) if r.ok() => JobStatus::Done,
188            Some(_) => JobStatus::Failed,
189            None => JobStatus::Running,
190        }
191    }
192
193    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
194    ///
195    /// Returns:
196    /// - `"running"` if the job is still running
197    /// - `"done:0"` if the job completed successfully
198    /// - `"failed:{code}"` if the job failed with an exit code
199    pub fn status_string(&mut self) -> String {
200        self.try_poll();
201        match &self.result {
202            Some(r) if r.ok() => "done:0".to_string(),
203            Some(r) => format!("failed:{}", r.code),
204            None => "running".to_string(),
205        }
206    }
207
208    /// Get the stdout stream (if attached).
209    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
210        self.stdout_stream.as_ref()
211    }
212
213    /// Get the stderr stream (if attached).
214    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
215        self.stderr_stream.as_ref()
216    }
217
218    /// Wait for the job to complete and return its result.
219    ///
220    /// On completion, the job's output is written to a temp file for later retrieval.
221    pub async fn wait(&mut self) -> ExecResult {
222        if let Some(result) = self.result.take() {
223            self.result = Some(result.clone());
224            return result;
225        }
226
227        let result = if let Some(handle) = self.handle.take() {
228            match handle.await {
229                Ok(r) => r,
230                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
231            }
232        } else if let Some(rx) = self.result_rx.take() {
233            match rx.await {
234                Ok(r) => r,
235                Err(_) => ExecResult::failure(1, "job channel closed"),
236            }
237        } else {
238            // Already waited
239            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
240        };
241
242        // Write output to a host temp file for later retrieval — but only when
243        // persistence is enabled. A hermetic / read-only kernel disables it so
244        // job output never bypasses the VFS onto the real filesystem.
245        if self.persist_output
246            && self.output_file.is_none()
247            && let Some(path) = self.write_output_file(&result) {
248                self.output_file = Some(path);
249            }
250
251        self.result = Some(result.clone());
252        result
253    }
254
255    /// Write job output to a temp file.
256    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
257        // Only write if there's output to capture
258        let text = result.text_out();
259        if text.is_empty() && result.err.is_empty() {
260            return None;
261        }
262
263        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
264        if std::fs::create_dir_all(&tmp_dir).is_err() {
265            tracing::warn!("Failed to create job output directory");
266            return None;
267        }
268
269        // Include the OS pid: `session_id` is only unique *within* a process
270        // (it's a process-local atomic that restarts at 0), so two kaish
271        // processes on one host — or two `cargo test` binaries — would
272        // otherwise both write `session_0_job_1.txt` into this shared dir and
273        // clobber each other (a real cross-process collision, and the source
274        // of the `test_cleanup_removes_temp_files` flake). pid + session_id +
275        // job id is unique across processes. Mirrors `output_limit`'s spill
276        // filename convention.
277        let filename = format!(
278            "session_{}_job_{}.{}.txt",
279            self.session_id,
280            self.id.0,
281            std::process::id()
282        );
283        let path = tmp_dir.join(filename);
284
285        let mut content = String::new();
286        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
287        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
288
289        if !text.is_empty() {
290            content.push_str("## STDOUT\n");
291            content.push_str(&text);
292            if !text.ends_with('\n') {
293                content.push('\n');
294            }
295        }
296
297        if !result.err.is_empty() {
298            content.push_str("\n## STDERR\n");
299            content.push_str(&result.err);
300            if !result.err.ends_with('\n') {
301                content.push('\n');
302            }
303        }
304
305        match std::fs::write(&path, content) {
306            Ok(()) => Some(path),
307            Err(e) => {
308                tracing::warn!("Failed to write job output file: {}", e);
309                None
310            }
311        }
312    }
313
314    /// Remove any temp files associated with this job.
315    pub fn cleanup_files(&mut self) {
316        if let Some(path) = self.output_file.take() {
317            if let Err(e) = std::fs::remove_file(&path) {
318                // Ignore "not found" — file may not have been written
319                if e.kind() != io::ErrorKind::NotFound {
320                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
321                }
322            }
323        }
324    }
325
326    /// Get the result if completed, without waiting.
327    pub fn try_result(&self) -> Option<&ExecResult> {
328        self.result.as_ref()
329    }
330
331    /// Try to poll the result channel and update status.
332    ///
333    /// This is a non-blocking check that updates `self.result` if the
334    /// job has completed. Returns true if the job is now done.
335    pub fn try_poll(&mut self) -> bool {
336        if self.result.is_some() {
337            return true;
338        }
339
340        // Try to poll the oneshot channel
341        if let Some(rx) = self.result_rx.as_mut() {
342            match rx.try_recv() {
343                Ok(result) => {
344                    self.result = Some(result);
345                    self.result_rx = None;
346                    return true;
347                }
348                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
349                    // Still running
350                    return false;
351                }
352                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
353                    // Channel closed without result - job failed
354                    self.result = Some(ExecResult::failure(1, "job channel closed"));
355                    self.result_rx = None;
356                    return true;
357                }
358            }
359        }
360
361        // Check if handle is finished
362        if let Some(handle) = self.handle.as_mut()
363            && handle.is_finished() {
364                // Take the handle and wait for it (should be instant)
365                let Some(mut handle) = self.handle.take() else {
366                    return false;
367                };
368                // Poll directly with a noop waker — safe because is_finished() was true
369                let waker = std::task::Waker::noop();
370                let mut cx = std::task::Context::from_waker(waker);
371                let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
372                    std::task::Poll::Ready(Ok(r)) => r,
373                    std::task::Poll::Ready(Err(e)) => {
374                        ExecResult::failure(1, format!("job panicked: {}", e))
375                    }
376                    std::task::Poll::Pending => return false, // shouldn't happen
377                };
378                self.result = Some(result);
379                return true;
380            }
381
382        false
383    }
384}
385
386/// Process-wide counter handing each JobManager a distinct session ID. Job IDs
387/// restart at 1 per manager, so the session ID is what keeps output file paths
388/// from colliding between managers sharing a process (concurrent tests, forks).
389/// It is process-LOCAL (restarts at 0 per process), so output filenames also
390/// mix in the OS pid to stay unique across processes — see `write_output_file`.
391static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
392
393/// Manager for background jobs.
394pub struct JobManager {
395    /// Process-unique ID for this manager, mixed into job output file paths.
396    session_id: u64,
397    /// Counter for generating unique job IDs.
398    next_id: AtomicU64,
399    /// Map of job ID to job.
400    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
401    /// Whether completed jobs persist their output to a host temp file. On by
402    /// default; a hermetic / read-only kernel disables it so output never
403    /// bypasses the VFS onto the real filesystem (see
404    /// [`set_persist_output_files`](Self::set_persist_output_files)). Stamped
405    /// onto each [`Job`] at registration.
406    persist_output_files: std::sync::atomic::AtomicBool,
407}
408
409impl JobManager {
410    /// Create a new job manager.
411    pub fn new() -> Self {
412        Self {
413            session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
414            next_id: AtomicU64::new(1),
415            jobs: Arc::new(Mutex::new(HashMap::new())),
416            persist_output_files: std::sync::atomic::AtomicBool::new(true),
417        }
418    }
419
420    /// Toggle whether completed jobs persist their output to a host temp file.
421    ///
422    /// Disable this for a hermetic / read-only kernel: the host write in
423    /// [`Job::write_output_file`] uses `std::fs` directly and so bypasses the
424    /// VFS (and any read-only mount). Live output stays available in-memory via
425    /// the VFS streams (`/v/jobs/{id}/stdout`), so nothing is lost in-process.
426    ///
427    /// Must be set before jobs are spawned — the flag is stamped onto each job
428    /// at registration time, not consulted at completion.
429    pub fn set_persist_output_files(&self, on: bool) {
430        self.persist_output_files.store(on, Ordering::Relaxed);
431    }
432
433    /// Whether completed jobs persist their output to a host temp file.
434    pub fn persist_output_files(&self) -> bool {
435        self.persist_output_files.load(Ordering::Relaxed)
436    }
437
438    /// Spawn a new background job from a future.
439    ///
440    /// The job is inserted into the map synchronously before returning,
441    /// guaranteeing it's immediately queryable via `exists()` or `get()`.
442    pub fn spawn<F>(&self, command: String, future: F) -> JobId
443    where
444        F: std::future::Future<Output = ExecResult> + Send + 'static,
445    {
446        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
447        // Propagate the embedder's trace context across the spawn boundary so
448        // background-job spans stay in the same trace (see telemetry module).
449        let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
450        let mut job = Job::new(id, self.session_id, command, handle);
451        job.persist_output = self.persist_output_files();
452
453        // Spin on try_lock to guarantee the job is in the map on return.
454        // The lock is tokio::sync::Mutex which is only held briefly during
455        // HashMap operations, so contention resolves quickly.
456        let mut job_opt = Some(job);
457        loop {
458            match self.jobs.try_lock() {
459                Ok(mut guard) => {
460                    if let Some(j) = job_opt.take() {
461                        guard.insert(id, j);
462                    }
463                    break;
464                }
465                Err(_) => {
466                    std::hint::spin_loop();
467                }
468            }
469        }
470
471        id
472    }
473
474    /// Spawn a job that's already running and communicate via channel.
475    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
476        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
477        let mut job = Job::from_channel(id, self.session_id, command, rx);
478        job.persist_output = self.persist_output_files();
479
480        let mut jobs = self.jobs.lock().await;
481        jobs.insert(id, job);
482
483        id
484    }
485
486    /// Register a job with attached output streams.
487    ///
488    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
489    pub async fn register_with_streams(
490        &self,
491        command: String,
492        rx: oneshot::Receiver<ExecResult>,
493        stdout: Arc<BoundedStream>,
494        stderr: Arc<BoundedStream>,
495    ) -> JobId {
496        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
497        let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
498        job.persist_output = self.persist_output_files();
499
500        let mut jobs = self.jobs.lock().await;
501        jobs.insert(id, job);
502
503        id
504    }
505
506    /// Wait for a specific job to complete.
507    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
508        let mut jobs = self.jobs.lock().await;
509        if let Some(job) = jobs.get_mut(&id) {
510            Some(job.wait().await)
511        } else {
512            None
513        }
514    }
515
516    /// Wait for all jobs to complete, returning results in completion order.
517    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
518        let mut results = Vec::new();
519
520        // Get all job IDs
521        let ids: Vec<JobId> = {
522            let jobs = self.jobs.lock().await;
523            jobs.keys().copied().collect()
524        };
525
526        for id in ids {
527            if let Some(result) = self.wait(id).await {
528                results.push((id, result));
529            }
530        }
531
532        results
533    }
534
535    /// List all jobs with their status.
536    pub async fn list(&self) -> Vec<JobInfo> {
537        let mut jobs = self.jobs.lock().await;
538        jobs.values_mut()
539            .map(|job| JobInfo {
540                id: job.id,
541                command: job.command.clone(),
542                status: job.status(),
543                output_file: job.output_file.clone(),
544                pid: job.pid,
545            })
546            .collect()
547    }
548
549    /// Get the number of running jobs.
550    pub async fn running_count(&self) -> usize {
551        let mut jobs = self.jobs.lock().await;
552        let mut count = 0;
553        for job in jobs.values_mut() {
554            if !job.is_done() {
555                count += 1;
556            }
557        }
558        count
559    }
560
561    /// Remove completed jobs from tracking and clean up their temp files.
562    pub async fn cleanup(&self) {
563        let mut jobs = self.jobs.lock().await;
564        jobs.retain(|_, job| {
565            if job.is_done() {
566                job.cleanup_files();
567                false
568            } else {
569                true
570            }
571        });
572    }
573
574    /// Check if a specific job exists.
575    pub async fn exists(&self, id: JobId) -> bool {
576        let jobs = self.jobs.lock().await;
577        jobs.contains_key(&id)
578    }
579
580    /// Get info for a specific job.
581    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
582        let mut jobs = self.jobs.lock().await;
583        jobs.get_mut(&id).map(|job| JobInfo {
584            id: job.id,
585            command: job.command.clone(),
586            status: job.status(),
587            output_file: job.output_file.clone(),
588            pid: job.pid,
589        })
590    }
591
592    /// Get the command string for a job.
593    pub async fn get_command(&self, id: JobId) -> Option<String> {
594        let jobs = self.jobs.lock().await;
595        jobs.get(&id).map(|job| job.command.clone())
596    }
597
598    /// Get the status string for a job (for /v/jobs/{id}/status).
599    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
600        let mut jobs = self.jobs.lock().await;
601        jobs.get_mut(&id).map(|job| job.status_string())
602    }
603
604    /// Read stdout stream content for a job.
605    ///
606    /// Returns `None` if the job doesn't exist or has no attached stream.
607    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
608        let jobs = self.jobs.lock().await;
609        if let Some(job) = jobs.get(&id)
610            && let Some(stream) = job.stdout_stream() {
611                return Some(stream.read().await);
612            }
613        None
614    }
615
616    /// Read stderr stream content for a job.
617    ///
618    /// Returns `None` if the job doesn't exist or has no attached stream.
619    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
620        let jobs = self.jobs.lock().await;
621        if let Some(job) = jobs.get(&id)
622            && let Some(stream) = job.stderr_stream() {
623                return Some(stream.read().await);
624            }
625        None
626    }
627
628    /// List all job IDs.
629    pub async fn list_ids(&self) -> Vec<JobId> {
630        let jobs = self.jobs.lock().await;
631        jobs.keys().copied().collect()
632    }
633
634    /// Register a stopped job (from Ctrl-Z on a foreground process).
635    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
636        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
637        let job = Job::stopped(id, self.session_id, command, pid, pgid);
638        let mut jobs = self.jobs.lock().await;
639        jobs.insert(id, job);
640        id
641    }
642
643    /// Mark a job as stopped with its process info.
644    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
645        let mut jobs = self.jobs.lock().await;
646        if let Some(job) = jobs.get_mut(&id) {
647            job.stopped = true;
648            job.pid = Some(pid);
649            job.pgid = Some(pgid);
650        }
651    }
652
653    /// Mark a stopped job as resumed.
654    pub async fn resume_job(&self, id: JobId) {
655        let mut jobs = self.jobs.lock().await;
656        if let Some(job) = jobs.get_mut(&id) {
657            job.stopped = false;
658        }
659    }
660
661    /// Get the most recently stopped job.
662    pub async fn last_stopped(&self) -> Option<JobId> {
663        let mut jobs = self.jobs.lock().await;
664        // Find the highest-numbered stopped job
665        let mut best: Option<JobId> = None;
666        for job in jobs.values_mut() {
667            if job.stopped {
668                match best {
669                    None => best = Some(job.id),
670                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
671                    _ => {}
672                }
673            }
674        }
675        best
676    }
677
678    /// Get process info (pid, pgid) for a job.
679    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
680        let jobs = self.jobs.lock().await;
681        jobs.get(&id).and_then(|job| {
682            match (job.pid, job.pgid) {
683                (Some(pid), Some(pgid)) => Some((pid, pgid)),
684                _ => None,
685            }
686        })
687    }
688
689    /// Record the cancellation token of the fork running a background job, so
690    /// `kill %N` can stop the job even when it has no OS process group of its
691    /// own (e.g. a pure builtin like `sleep &`).
692    pub async fn set_cancel_token(&self, id: JobId, token: tokio_util::sync::CancellationToken) {
693        let mut jobs = self.jobs.lock().await;
694        if let Some(job) = jobs.get_mut(&id) {
695            job.cancel = Some(token);
696        }
697    }
698
699    /// Cancel a job by its token. Returns `true` if a token was recorded and
700    /// cancelled. The cancellation cascade stops in-process builtin futures and
701    /// SIGTERM→SIGKILLs any external children's process groups.
702    pub async fn cancel(&self, id: JobId) -> bool {
703        let jobs = self.jobs.lock().await;
704        match jobs.get(&id).and_then(|job| job.cancel.clone()) {
705            Some(token) => {
706                token.cancel();
707                true
708            }
709            None => false,
710        }
711    }
712
713    /// Record a process group spawned while running a background job. Lets
714    /// `kill -<sig> %N` deliver an arbitrary signal directly to the real
715    /// processes. Deduplicated (a job may spawn several externals).
716    pub async fn add_pgid(&self, id: JobId, pgid: u32) {
717        let mut jobs = self.jobs.lock().await;
718        if let Some(job) = jobs.get_mut(&id) {
719            if !job.pgids.contains(&pgid) {
720                job.pgids.push(pgid);
721            }
722        }
723    }
724
725    /// The process groups recorded for a job (empty for a pure-builtin job).
726    /// Includes the legacy single `pgid` recorded for *stopped* jobs (Ctrl-Z),
727    /// so `kill %N` signals a stopped foreground job's group too.
728    pub async fn job_pgids(&self, id: JobId) -> Vec<u32> {
729        let jobs = self.jobs.lock().await;
730        jobs.get(&id)
731            .map(|job| {
732                let mut v = job.pgids.clone();
733                if let Some(pg) = job.pgid {
734                    if !v.contains(&pg) {
735                        v.push(pg);
736                    }
737                }
738                v
739            })
740            .unwrap_or_default()
741    }
742
743    /// Remove a job from tracking.
744    pub async fn remove(&self, id: JobId) {
745        let mut jobs = self.jobs.lock().await;
746        if let Some(mut job) = jobs.remove(&id) {
747            job.cleanup_files();
748        }
749    }
750}
751
752impl Default for JobManager {
753    fn default() -> Self {
754        Self::new()
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use std::time::Duration;
762
763    #[tokio::test]
764    async fn test_no_host_output_file_when_persistence_disabled() {
765        // A hermetic / read-only kernel (custom backend, or NoLocal mode)
766        // disables host output-file persistence so a background job's output
767        // never lands on the real filesystem via `std::fs`, bypassing the VFS.
768        let manager = JobManager::new();
769        assert!(manager.persist_output_files(), "default is to persist");
770        manager.set_persist_output_files(false);
771        assert!(!manager.persist_output_files());
772
773        let id = manager.spawn("leaky".to_string(), async {
774            ExecResult::success("output that must not hit host disk")
775        });
776        tokio::time::sleep(Duration::from_millis(10)).await;
777        let result = manager.wait(id).await;
778        assert!(result.is_some());
779
780        // No temp file should have been written to the host filesystem.
781        let output_file = {
782            let jobs = manager.jobs.lock().await;
783            jobs.get(&id).and_then(|j| j.output_file().cloned())
784        };
785        assert!(
786            output_file.is_none(),
787            "no host output file should be written when persistence is disabled, got {output_file:?}"
788        );
789    }
790
791    #[tokio::test]
792    async fn test_spawn_and_wait() {
793        let manager = JobManager::new();
794
795        let id = manager.spawn("test".to_string(), async {
796            tokio::time::sleep(Duration::from_millis(10)).await;
797            ExecResult::success("done")
798        });
799
800        // Wait a bit for the job to be registered
801        tokio::time::sleep(Duration::from_millis(5)).await;
802
803        let result = manager.wait(id).await;
804        assert!(result.is_some());
805        let result = result.unwrap();
806        assert!(result.ok());
807        assert_eq!(&*result.text_out(), "done");
808    }
809
810    #[tokio::test]
811    async fn test_wait_all() {
812        let manager = JobManager::new();
813
814        manager.spawn("job1".to_string(), async {
815            tokio::time::sleep(Duration::from_millis(10)).await;
816            ExecResult::success("one")
817        });
818
819        manager.spawn("job2".to_string(), async {
820            tokio::time::sleep(Duration::from_millis(5)).await;
821            ExecResult::success("two")
822        });
823
824        // Wait for jobs to register
825        tokio::time::sleep(Duration::from_millis(5)).await;
826
827        let results = manager.wait_all().await;
828        assert_eq!(results.len(), 2);
829    }
830
831    #[tokio::test]
832    async fn test_list_jobs() {
833        let manager = JobManager::new();
834
835        manager.spawn("test job".to_string(), async {
836            tokio::time::sleep(Duration::from_millis(50)).await;
837            ExecResult::success("")
838        });
839
840        // Wait for job to register
841        tokio::time::sleep(Duration::from_millis(5)).await;
842
843        let jobs = manager.list().await;
844        assert_eq!(jobs.len(), 1);
845        assert_eq!(jobs[0].command, "test job");
846        assert_eq!(jobs[0].status, JobStatus::Running);
847    }
848
849    #[tokio::test]
850    async fn test_job_status_after_completion() {
851        let manager = JobManager::new();
852
853        let id = manager.spawn("quick".to_string(), async {
854            ExecResult::success("")
855        });
856
857        // Wait for job to complete
858        tokio::time::sleep(Duration::from_millis(10)).await;
859        let _ = manager.wait(id).await;
860
861        let info = manager.get(id).await;
862        assert!(info.is_some());
863        assert_eq!(info.unwrap().status, JobStatus::Done);
864    }
865
866    #[tokio::test]
867    async fn test_cleanup() {
868        let manager = JobManager::new();
869
870        let id = manager.spawn("done".to_string(), async {
871            ExecResult::success("")
872        });
873
874        // Wait for completion
875        tokio::time::sleep(Duration::from_millis(10)).await;
876        let _ = manager.wait(id).await;
877
878        // Should have 1 job
879        assert_eq!(manager.list().await.len(), 1);
880
881        // Cleanup
882        manager.cleanup().await;
883
884        // Should have 0 jobs
885        assert_eq!(manager.list().await.len(), 0);
886    }
887
888    #[tokio::test]
889    async fn test_cleanup_removes_temp_files() {
890        // Bug K: cleanup should remove temp files
891        let manager = JobManager::new();
892
893        let id = manager.spawn("output job".to_string(), async {
894            ExecResult::success("some output that gets written to a temp file")
895        });
896
897        // Wait for completion (triggers output file creation)
898        tokio::time::sleep(Duration::from_millis(10)).await;
899        let result = manager.wait(id).await;
900        assert!(result.is_some());
901
902        // Get the output file path before cleanup. The job produced output, so
903        // a temp file must have been written — otherwise this test would pass
904        // vacuously.
905        let output_file = {
906            let jobs = manager.jobs.lock().await;
907            jobs.get(&id).and_then(|j| j.output_file().cloned())
908        };
909        let path = output_file.expect("job with output should have written a temp file");
910        assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
911
912        // Cleanup should remove the job and its files.
913        manager.cleanup().await;
914
915        assert!(
916            !path.exists(),
917            "temp file should be removed after cleanup: {}",
918            path.display()
919        );
920    }
921
922    #[tokio::test]
923    async fn test_register_with_channel() {
924        let manager = JobManager::new();
925        let (tx, rx) = oneshot::channel();
926
927        let id = manager.register("channel job".to_string(), rx).await;
928
929        // Send result
930        tx.send(ExecResult::success("from channel")).unwrap();
931
932        let result = manager.wait(id).await;
933        assert!(result.is_some());
934        assert_eq!(&*result.unwrap().text_out(), "from channel");
935    }
936
937    #[tokio::test]
938    async fn test_spawn_immediately_available() {
939        // Bug J: job should be queryable immediately after spawn()
940        let manager = JobManager::new();
941
942        let id = manager.spawn("instant".to_string(), async {
943            tokio::time::sleep(Duration::from_millis(100)).await;
944            ExecResult::success("done")
945        });
946
947        // Should be immediately visible without any sleep
948        let exists = manager.exists(id).await;
949        assert!(exists, "job should be immediately available after spawn()");
950
951        let info = manager.get(id).await;
952        assert!(info.is_some(), "job info should be available immediately");
953    }
954
955    #[tokio::test]
956    async fn test_nonexistent_job() {
957        let manager = JobManager::new();
958        let result = manager.wait(JobId(999)).await;
959        assert!(result.is_none());
960    }
961
962    #[tokio::test]
963    async fn test_cancel_token_fires() {
964        // A recorded cancel token can be tripped by id — this is how `kill %N`
965        // stops a pure-builtin job that has no OS process group.
966        let manager = JobManager::new();
967        let token = tokio_util::sync::CancellationToken::new();
968        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
969        manager.set_cancel_token(id, token.clone()).await;
970
971        assert!(!token.is_cancelled());
972        assert!(manager.cancel(id).await, "cancel should report success");
973        assert!(token.is_cancelled(), "the job's token must be tripped");
974    }
975
976    #[tokio::test]
977    async fn test_cancel_without_token_returns_false() {
978        let manager = JobManager::new();
979        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
980        // No token recorded → nothing to cancel.
981        assert!(!manager.cancel(id).await);
982        // Unknown id → also false.
983        assert!(!manager.cancel(JobId(999)).await);
984    }
985
986    #[tokio::test]
987    async fn test_pgids_recorded_and_deduped() {
988        let manager = JobManager::new();
989        let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
990        assert!(manager.job_pgids(id).await.is_empty());
991
992        manager.add_pgid(id, 4242).await;
993        manager.add_pgid(id, 4243).await;
994        manager.add_pgid(id, 4242).await; // duplicate ignored
995        assert_eq!(manager.job_pgids(id).await, vec![4242, 4243]);
996
997        // Unknown id → empty, no panic.
998        assert!(manager.job_pgids(JobId(999)).await.is_empty());
999    }
1000}