Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18// Data types re-exported from kaish-types.
19pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21/// A background job.
22pub struct Job {
23    /// Job ID.
24    pub id: JobId,
25    /// Owning manager's session ID — disambiguates output file paths between
26    /// JobManager instances that share the process (and thus the same job ID
27    /// space, since IDs restart at 1 per manager).
28    session_id: u64,
29    /// Command description.
30    pub command: String,
31    /// Task handle (None if already awaited).
32    handle: Option<JoinHandle<ExecResult>>,
33    /// Channel to receive result (alternative to handle).
34    result_rx: Option<oneshot::Receiver<ExecResult>>,
35    /// Cached result after completion.
36    result: Option<ExecResult>,
37    /// Path to output file (captures stdout/stderr after completion).
38    output_file: Option<PathBuf>,
39    /// Whether to persist completed output to a host temp file. Disabled for
40    /// hermetic / read-only kernels (custom backend, NoLocal) whose output must
41    /// never reach the real filesystem outside the VFS — see
42    /// [`JobManager::set_persist_output_files`]. Stamped from the manager when
43    /// the job is registered. Live output is always available in-memory via the
44    /// VFS streams (`/v/jobs/{id}/stdout`), so suppressing the file loses
45    /// nothing for an in-process consumer.
46    persist_output: bool,
47    /// Live stdout stream (bounded ring buffer).
48    stdout_stream: Option<Arc<BoundedStream>>,
49    /// Live stderr stream (bounded ring buffer).
50    stderr_stream: Option<Arc<BoundedStream>>,
51    /// OS process ID (for stopped jobs).
52    pid: Option<u32>,
53    /// OS process group ID (for stopped jobs).
54    pgid: Option<u32>,
55    /// Whether this job is stopped (SIGTSTP).
56    stopped: bool,
57}
58
59impl Job {
60    /// Create a new job from a task handle.
61    pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
62        Self {
63            id,
64            session_id,
65            command,
66            handle: Some(handle),
67            result_rx: None,
68            result: None,
69            output_file: None,
70            persist_output: true,
71            stdout_stream: None,
72            stderr_stream: None,
73            pid: None,
74            pgid: None,
75            stopped: false,
76        }
77    }
78
79    /// Create a new job from a result channel.
80    pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
81        Self {
82            id,
83            session_id,
84            command,
85            handle: None,
86            result_rx: Some(rx),
87            result: None,
88            output_file: None,
89            persist_output: true,
90            stdout_stream: None,
91            stderr_stream: None,
92            pid: None,
93            pgid: None,
94            stopped: false,
95        }
96    }
97
98    /// Create a new job with attached output streams.
99    ///
100    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
101    pub fn with_streams(
102        id: JobId,
103        session_id: u64,
104        command: String,
105        rx: oneshot::Receiver<ExecResult>,
106        stdout: Arc<BoundedStream>,
107        stderr: Arc<BoundedStream>,
108    ) -> Self {
109        Self {
110            id,
111            session_id,
112            command,
113            handle: None,
114            result_rx: Some(rx),
115            result: None,
116            output_file: None,
117            persist_output: true,
118            stdout_stream: Some(stdout),
119            stderr_stream: Some(stderr),
120            pid: None,
121            pgid: None,
122            stopped: false,
123        }
124    }
125
126    /// Create a stopped job (from Ctrl-Z on a foreground process).
127    pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
128        Self {
129            id,
130            session_id,
131            command,
132            handle: None,
133            result_rx: None,
134            result: None,
135            output_file: None,
136            persist_output: true,
137            stdout_stream: None,
138            stderr_stream: None,
139            pid: Some(pid),
140            pgid: Some(pgid),
141            stopped: true,
142        }
143    }
144
145    /// Get the output file path (if available).
146    pub fn output_file(&self) -> Option<&PathBuf> {
147        self.output_file.as_ref()
148    }
149
150    /// Check if the job has completed.
151    ///
152    /// Stopped jobs are not considered done.
153    pub fn is_done(&mut self) -> bool {
154        if self.stopped {
155            return false;
156        }
157        self.try_poll();
158        self.result.is_some()
159    }
160
161    /// Get the job's status.
162    pub fn status(&mut self) -> JobStatus {
163        if self.stopped {
164            return JobStatus::Stopped;
165        }
166        self.try_poll();
167        match &self.result {
168            Some(r) if r.ok() => JobStatus::Done,
169            Some(_) => JobStatus::Failed,
170            None => JobStatus::Running,
171        }
172    }
173
174    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
175    ///
176    /// Returns:
177    /// - `"running"` if the job is still running
178    /// - `"done:0"` if the job completed successfully
179    /// - `"failed:{code}"` if the job failed with an exit code
180    pub fn status_string(&mut self) -> String {
181        self.try_poll();
182        match &self.result {
183            Some(r) if r.ok() => "done:0".to_string(),
184            Some(r) => format!("failed:{}", r.code),
185            None => "running".to_string(),
186        }
187    }
188
189    /// Get the stdout stream (if attached).
190    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
191        self.stdout_stream.as_ref()
192    }
193
194    /// Get the stderr stream (if attached).
195    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
196        self.stderr_stream.as_ref()
197    }
198
199    /// Wait for the job to complete and return its result.
200    ///
201    /// On completion, the job's output is written to a temp file for later retrieval.
202    pub async fn wait(&mut self) -> ExecResult {
203        if let Some(result) = self.result.take() {
204            self.result = Some(result.clone());
205            return result;
206        }
207
208        let result = if let Some(handle) = self.handle.take() {
209            match handle.await {
210                Ok(r) => r,
211                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
212            }
213        } else if let Some(rx) = self.result_rx.take() {
214            match rx.await {
215                Ok(r) => r,
216                Err(_) => ExecResult::failure(1, "job channel closed"),
217            }
218        } else {
219            // Already waited
220            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
221        };
222
223        // Write output to a host temp file for later retrieval — but only when
224        // persistence is enabled. A hermetic / read-only kernel disables it so
225        // job output never bypasses the VFS onto the real filesystem.
226        if self.persist_output
227            && self.output_file.is_none()
228            && let Some(path) = self.write_output_file(&result) {
229                self.output_file = Some(path);
230            }
231
232        self.result = Some(result.clone());
233        result
234    }
235
236    /// Write job output to a temp file.
237    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
238        // Only write if there's output to capture
239        let text = result.text_out();
240        if text.is_empty() && result.err.is_empty() {
241            return None;
242        }
243
244        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
245        if std::fs::create_dir_all(&tmp_dir).is_err() {
246            tracing::warn!("Failed to create job output directory");
247            return None;
248        }
249
250        // Include the OS pid: `session_id` is only unique *within* a process
251        // (it's a process-local atomic that restarts at 0), so two kaish
252        // processes on one host — or two `cargo test` binaries — would
253        // otherwise both write `session_0_job_1.txt` into this shared dir and
254        // clobber each other (a real cross-process collision, and the source
255        // of the `test_cleanup_removes_temp_files` flake). pid + session_id +
256        // job id is unique across processes. Mirrors `output_limit`'s spill
257        // filename convention.
258        let filename = format!(
259            "session_{}_job_{}.{}.txt",
260            self.session_id,
261            self.id.0,
262            std::process::id()
263        );
264        let path = tmp_dir.join(filename);
265
266        let mut content = String::new();
267        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
268        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
269
270        if !text.is_empty() {
271            content.push_str("## STDOUT\n");
272            content.push_str(&text);
273            if !text.ends_with('\n') {
274                content.push('\n');
275            }
276        }
277
278        if !result.err.is_empty() {
279            content.push_str("\n## STDERR\n");
280            content.push_str(&result.err);
281            if !result.err.ends_with('\n') {
282                content.push('\n');
283            }
284        }
285
286        match std::fs::write(&path, content) {
287            Ok(()) => Some(path),
288            Err(e) => {
289                tracing::warn!("Failed to write job output file: {}", e);
290                None
291            }
292        }
293    }
294
295    /// Remove any temp files associated with this job.
296    pub fn cleanup_files(&mut self) {
297        if let Some(path) = self.output_file.take() {
298            if let Err(e) = std::fs::remove_file(&path) {
299                // Ignore "not found" — file may not have been written
300                if e.kind() != io::ErrorKind::NotFound {
301                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
302                }
303            }
304        }
305    }
306
307    /// Get the result if completed, without waiting.
308    pub fn try_result(&self) -> Option<&ExecResult> {
309        self.result.as_ref()
310    }
311
312    /// Try to poll the result channel and update status.
313    ///
314    /// This is a non-blocking check that updates `self.result` if the
315    /// job has completed. Returns true if the job is now done.
316    pub fn try_poll(&mut self) -> bool {
317        if self.result.is_some() {
318            return true;
319        }
320
321        // Try to poll the oneshot channel
322        if let Some(rx) = self.result_rx.as_mut() {
323            match rx.try_recv() {
324                Ok(result) => {
325                    self.result = Some(result);
326                    self.result_rx = None;
327                    return true;
328                }
329                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
330                    // Still running
331                    return false;
332                }
333                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
334                    // Channel closed without result - job failed
335                    self.result = Some(ExecResult::failure(1, "job channel closed"));
336                    self.result_rx = None;
337                    return true;
338                }
339            }
340        }
341
342        // Check if handle is finished
343        if let Some(handle) = self.handle.as_mut()
344            && handle.is_finished() {
345                // Take the handle and wait for it (should be instant)
346                let Some(mut handle) = self.handle.take() else {
347                    return false;
348                };
349                // Poll directly with a noop waker — safe because is_finished() was true
350                let waker = std::task::Waker::noop();
351                let mut cx = std::task::Context::from_waker(waker);
352                let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
353                    std::task::Poll::Ready(Ok(r)) => r,
354                    std::task::Poll::Ready(Err(e)) => {
355                        ExecResult::failure(1, format!("job panicked: {}", e))
356                    }
357                    std::task::Poll::Pending => return false, // shouldn't happen
358                };
359                self.result = Some(result);
360                return true;
361            }
362
363        false
364    }
365}
366
367/// Process-wide counter handing each JobManager a distinct session ID. Job IDs
368/// restart at 1 per manager, so the session ID is what keeps output file paths
369/// from colliding between managers sharing a process (concurrent tests, forks).
370/// It is process-LOCAL (restarts at 0 per process), so output filenames also
371/// mix in the OS pid to stay unique across processes — see `write_output_file`.
372static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
373
374/// Manager for background jobs.
375pub struct JobManager {
376    /// Process-unique ID for this manager, mixed into job output file paths.
377    session_id: u64,
378    /// Counter for generating unique job IDs.
379    next_id: AtomicU64,
380    /// Map of job ID to job.
381    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
382    /// Whether completed jobs persist their output to a host temp file. On by
383    /// default; a hermetic / read-only kernel disables it so output never
384    /// bypasses the VFS onto the real filesystem (see
385    /// [`set_persist_output_files`](Self::set_persist_output_files)). Stamped
386    /// onto each [`Job`] at registration.
387    persist_output_files: std::sync::atomic::AtomicBool,
388}
389
390impl JobManager {
391    /// Create a new job manager.
392    pub fn new() -> Self {
393        Self {
394            session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
395            next_id: AtomicU64::new(1),
396            jobs: Arc::new(Mutex::new(HashMap::new())),
397            persist_output_files: std::sync::atomic::AtomicBool::new(true),
398        }
399    }
400
401    /// Toggle whether completed jobs persist their output to a host temp file.
402    ///
403    /// Disable this for a hermetic / read-only kernel: the host write in
404    /// [`Job::write_output_file`] uses `std::fs` directly and so bypasses the
405    /// VFS (and any read-only mount). Live output stays available in-memory via
406    /// the VFS streams (`/v/jobs/{id}/stdout`), so nothing is lost in-process.
407    ///
408    /// Must be set before jobs are spawned — the flag is stamped onto each job
409    /// at registration time, not consulted at completion.
410    pub fn set_persist_output_files(&self, on: bool) {
411        self.persist_output_files.store(on, Ordering::Relaxed);
412    }
413
414    /// Whether completed jobs persist their output to a host temp file.
415    pub fn persist_output_files(&self) -> bool {
416        self.persist_output_files.load(Ordering::Relaxed)
417    }
418
419    /// Spawn a new background job from a future.
420    ///
421    /// The job is inserted into the map synchronously before returning,
422    /// guaranteeing it's immediately queryable via `exists()` or `get()`.
423    pub fn spawn<F>(&self, command: String, future: F) -> JobId
424    where
425        F: std::future::Future<Output = ExecResult> + Send + 'static,
426    {
427        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
428        // Propagate the embedder's trace context across the spawn boundary so
429        // background-job spans stay in the same trace (see telemetry module).
430        let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
431        let mut job = Job::new(id, self.session_id, command, handle);
432        job.persist_output = self.persist_output_files();
433
434        // Spin on try_lock to guarantee the job is in the map on return.
435        // The lock is tokio::sync::Mutex which is only held briefly during
436        // HashMap operations, so contention resolves quickly.
437        let mut job_opt = Some(job);
438        loop {
439            match self.jobs.try_lock() {
440                Ok(mut guard) => {
441                    if let Some(j) = job_opt.take() {
442                        guard.insert(id, j);
443                    }
444                    break;
445                }
446                Err(_) => {
447                    std::hint::spin_loop();
448                }
449            }
450        }
451
452        id
453    }
454
455    /// Spawn a job that's already running and communicate via channel.
456    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
457        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
458        let mut job = Job::from_channel(id, self.session_id, command, rx);
459        job.persist_output = self.persist_output_files();
460
461        let mut jobs = self.jobs.lock().await;
462        jobs.insert(id, job);
463
464        id
465    }
466
467    /// Register a job with attached output streams.
468    ///
469    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
470    pub async fn register_with_streams(
471        &self,
472        command: String,
473        rx: oneshot::Receiver<ExecResult>,
474        stdout: Arc<BoundedStream>,
475        stderr: Arc<BoundedStream>,
476    ) -> JobId {
477        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
478        let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
479        job.persist_output = self.persist_output_files();
480
481        let mut jobs = self.jobs.lock().await;
482        jobs.insert(id, job);
483
484        id
485    }
486
487    /// Wait for a specific job to complete.
488    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
489        let mut jobs = self.jobs.lock().await;
490        if let Some(job) = jobs.get_mut(&id) {
491            Some(job.wait().await)
492        } else {
493            None
494        }
495    }
496
497    /// Wait for all jobs to complete, returning results in completion order.
498    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
499        let mut results = Vec::new();
500
501        // Get all job IDs
502        let ids: Vec<JobId> = {
503            let jobs = self.jobs.lock().await;
504            jobs.keys().copied().collect()
505        };
506
507        for id in ids {
508            if let Some(result) = self.wait(id).await {
509                results.push((id, result));
510            }
511        }
512
513        results
514    }
515
516    /// List all jobs with their status.
517    pub async fn list(&self) -> Vec<JobInfo> {
518        let mut jobs = self.jobs.lock().await;
519        jobs.values_mut()
520            .map(|job| JobInfo {
521                id: job.id,
522                command: job.command.clone(),
523                status: job.status(),
524                output_file: job.output_file.clone(),
525                pid: job.pid,
526            })
527            .collect()
528    }
529
530    /// Get the number of running jobs.
531    pub async fn running_count(&self) -> usize {
532        let mut jobs = self.jobs.lock().await;
533        let mut count = 0;
534        for job in jobs.values_mut() {
535            if !job.is_done() {
536                count += 1;
537            }
538        }
539        count
540    }
541
542    /// Remove completed jobs from tracking and clean up their temp files.
543    pub async fn cleanup(&self) {
544        let mut jobs = self.jobs.lock().await;
545        jobs.retain(|_, job| {
546            if job.is_done() {
547                job.cleanup_files();
548                false
549            } else {
550                true
551            }
552        });
553    }
554
555    /// Check if a specific job exists.
556    pub async fn exists(&self, id: JobId) -> bool {
557        let jobs = self.jobs.lock().await;
558        jobs.contains_key(&id)
559    }
560
561    /// Get info for a specific job.
562    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
563        let mut jobs = self.jobs.lock().await;
564        jobs.get_mut(&id).map(|job| JobInfo {
565            id: job.id,
566            command: job.command.clone(),
567            status: job.status(),
568            output_file: job.output_file.clone(),
569            pid: job.pid,
570        })
571    }
572
573    /// Get the command string for a job.
574    pub async fn get_command(&self, id: JobId) -> Option<String> {
575        let jobs = self.jobs.lock().await;
576        jobs.get(&id).map(|job| job.command.clone())
577    }
578
579    /// Get the status string for a job (for /v/jobs/{id}/status).
580    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
581        let mut jobs = self.jobs.lock().await;
582        jobs.get_mut(&id).map(|job| job.status_string())
583    }
584
585    /// Read stdout stream content for a job.
586    ///
587    /// Returns `None` if the job doesn't exist or has no attached stream.
588    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
589        let jobs = self.jobs.lock().await;
590        if let Some(job) = jobs.get(&id)
591            && let Some(stream) = job.stdout_stream() {
592                return Some(stream.read().await);
593            }
594        None
595    }
596
597    /// Read stderr stream content for a job.
598    ///
599    /// Returns `None` if the job doesn't exist or has no attached stream.
600    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
601        let jobs = self.jobs.lock().await;
602        if let Some(job) = jobs.get(&id)
603            && let Some(stream) = job.stderr_stream() {
604                return Some(stream.read().await);
605            }
606        None
607    }
608
609    /// List all job IDs.
610    pub async fn list_ids(&self) -> Vec<JobId> {
611        let jobs = self.jobs.lock().await;
612        jobs.keys().copied().collect()
613    }
614
615    /// Register a stopped job (from Ctrl-Z on a foreground process).
616    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
617        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
618        let job = Job::stopped(id, self.session_id, command, pid, pgid);
619        let mut jobs = self.jobs.lock().await;
620        jobs.insert(id, job);
621        id
622    }
623
624    /// Mark a job as stopped with its process info.
625    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
626        let mut jobs = self.jobs.lock().await;
627        if let Some(job) = jobs.get_mut(&id) {
628            job.stopped = true;
629            job.pid = Some(pid);
630            job.pgid = Some(pgid);
631        }
632    }
633
634    /// Mark a stopped job as resumed.
635    pub async fn resume_job(&self, id: JobId) {
636        let mut jobs = self.jobs.lock().await;
637        if let Some(job) = jobs.get_mut(&id) {
638            job.stopped = false;
639        }
640    }
641
642    /// Get the most recently stopped job.
643    pub async fn last_stopped(&self) -> Option<JobId> {
644        let mut jobs = self.jobs.lock().await;
645        // Find the highest-numbered stopped job
646        let mut best: Option<JobId> = None;
647        for job in jobs.values_mut() {
648            if job.stopped {
649                match best {
650                    None => best = Some(job.id),
651                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
652                    _ => {}
653                }
654            }
655        }
656        best
657    }
658
659    /// Get process info (pid, pgid) for a job.
660    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
661        let jobs = self.jobs.lock().await;
662        jobs.get(&id).and_then(|job| {
663            match (job.pid, job.pgid) {
664                (Some(pid), Some(pgid)) => Some((pid, pgid)),
665                _ => None,
666            }
667        })
668    }
669
670    /// Remove a job from tracking.
671    pub async fn remove(&self, id: JobId) {
672        let mut jobs = self.jobs.lock().await;
673        if let Some(mut job) = jobs.remove(&id) {
674            job.cleanup_files();
675        }
676    }
677}
678
679impl Default for JobManager {
680    fn default() -> Self {
681        Self::new()
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use std::time::Duration;
689
690    #[tokio::test]
691    async fn test_no_host_output_file_when_persistence_disabled() {
692        // A hermetic / read-only kernel (custom backend, or NoLocal mode)
693        // disables host output-file persistence so a background job's output
694        // never lands on the real filesystem via `std::fs`, bypassing the VFS.
695        let manager = JobManager::new();
696        assert!(manager.persist_output_files(), "default is to persist");
697        manager.set_persist_output_files(false);
698        assert!(!manager.persist_output_files());
699
700        let id = manager.spawn("leaky".to_string(), async {
701            ExecResult::success("output that must not hit host disk")
702        });
703        tokio::time::sleep(Duration::from_millis(10)).await;
704        let result = manager.wait(id).await;
705        assert!(result.is_some());
706
707        // No temp file should have been written to the host filesystem.
708        let output_file = {
709            let jobs = manager.jobs.lock().await;
710            jobs.get(&id).and_then(|j| j.output_file().cloned())
711        };
712        assert!(
713            output_file.is_none(),
714            "no host output file should be written when persistence is disabled, got {output_file:?}"
715        );
716    }
717
718    #[tokio::test]
719    async fn test_spawn_and_wait() {
720        let manager = JobManager::new();
721
722        let id = manager.spawn("test".to_string(), async {
723            tokio::time::sleep(Duration::from_millis(10)).await;
724            ExecResult::success("done")
725        });
726
727        // Wait a bit for the job to be registered
728        tokio::time::sleep(Duration::from_millis(5)).await;
729
730        let result = manager.wait(id).await;
731        assert!(result.is_some());
732        let result = result.unwrap();
733        assert!(result.ok());
734        assert_eq!(&*result.text_out(), "done");
735    }
736
737    #[tokio::test]
738    async fn test_wait_all() {
739        let manager = JobManager::new();
740
741        manager.spawn("job1".to_string(), async {
742            tokio::time::sleep(Duration::from_millis(10)).await;
743            ExecResult::success("one")
744        });
745
746        manager.spawn("job2".to_string(), async {
747            tokio::time::sleep(Duration::from_millis(5)).await;
748            ExecResult::success("two")
749        });
750
751        // Wait for jobs to register
752        tokio::time::sleep(Duration::from_millis(5)).await;
753
754        let results = manager.wait_all().await;
755        assert_eq!(results.len(), 2);
756    }
757
758    #[tokio::test]
759    async fn test_list_jobs() {
760        let manager = JobManager::new();
761
762        manager.spawn("test job".to_string(), async {
763            tokio::time::sleep(Duration::from_millis(50)).await;
764            ExecResult::success("")
765        });
766
767        // Wait for job to register
768        tokio::time::sleep(Duration::from_millis(5)).await;
769
770        let jobs = manager.list().await;
771        assert_eq!(jobs.len(), 1);
772        assert_eq!(jobs[0].command, "test job");
773        assert_eq!(jobs[0].status, JobStatus::Running);
774    }
775
776    #[tokio::test]
777    async fn test_job_status_after_completion() {
778        let manager = JobManager::new();
779
780        let id = manager.spawn("quick".to_string(), async {
781            ExecResult::success("")
782        });
783
784        // Wait for job to complete
785        tokio::time::sleep(Duration::from_millis(10)).await;
786        let _ = manager.wait(id).await;
787
788        let info = manager.get(id).await;
789        assert!(info.is_some());
790        assert_eq!(info.unwrap().status, JobStatus::Done);
791    }
792
793    #[tokio::test]
794    async fn test_cleanup() {
795        let manager = JobManager::new();
796
797        let id = manager.spawn("done".to_string(), async {
798            ExecResult::success("")
799        });
800
801        // Wait for completion
802        tokio::time::sleep(Duration::from_millis(10)).await;
803        let _ = manager.wait(id).await;
804
805        // Should have 1 job
806        assert_eq!(manager.list().await.len(), 1);
807
808        // Cleanup
809        manager.cleanup().await;
810
811        // Should have 0 jobs
812        assert_eq!(manager.list().await.len(), 0);
813    }
814
815    #[tokio::test]
816    async fn test_cleanup_removes_temp_files() {
817        // Bug K: cleanup should remove temp files
818        let manager = JobManager::new();
819
820        let id = manager.spawn("output job".to_string(), async {
821            ExecResult::success("some output that gets written to a temp file")
822        });
823
824        // Wait for completion (triggers output file creation)
825        tokio::time::sleep(Duration::from_millis(10)).await;
826        let result = manager.wait(id).await;
827        assert!(result.is_some());
828
829        // Get the output file path before cleanup. The job produced output, so
830        // a temp file must have been written — otherwise this test would pass
831        // vacuously.
832        let output_file = {
833            let jobs = manager.jobs.lock().await;
834            jobs.get(&id).and_then(|j| j.output_file().cloned())
835        };
836        let path = output_file.expect("job with output should have written a temp file");
837        assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
838
839        // Cleanup should remove the job and its files.
840        manager.cleanup().await;
841
842        assert!(
843            !path.exists(),
844            "temp file should be removed after cleanup: {}",
845            path.display()
846        );
847    }
848
849    #[tokio::test]
850    async fn test_register_with_channel() {
851        let manager = JobManager::new();
852        let (tx, rx) = oneshot::channel();
853
854        let id = manager.register("channel job".to_string(), rx).await;
855
856        // Send result
857        tx.send(ExecResult::success("from channel")).unwrap();
858
859        let result = manager.wait(id).await;
860        assert!(result.is_some());
861        assert_eq!(&*result.unwrap().text_out(), "from channel");
862    }
863
864    #[tokio::test]
865    async fn test_spawn_immediately_available() {
866        // Bug J: job should be queryable immediately after spawn()
867        let manager = JobManager::new();
868
869        let id = manager.spawn("instant".to_string(), async {
870            tokio::time::sleep(Duration::from_millis(100)).await;
871            ExecResult::success("done")
872        });
873
874        // Should be immediately visible without any sleep
875        let exists = manager.exists(id).await;
876        assert!(exists, "job should be immediately available after spawn()");
877
878        let info = manager.get(id).await;
879        assert!(info.is_some(), "job info should be available immediately");
880    }
881
882    #[tokio::test]
883    async fn test_nonexistent_job() {
884        let manager = JobManager::new();
885        let result = manager.wait(JobId(999)).await;
886        assert!(result.is_none());
887    }
888}