Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use tokio::sync::{oneshot, Mutex};
12use tokio::task::JoinHandle;
13
14use super::stream::BoundedStream;
15use crate::interpreter::ExecResult;
16
17/// Unique identifier for a background job.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub struct JobId(pub u64);
20
21impl std::fmt::Display for JobId {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "{}", self.0)
24    }
25}
26
27/// Status of a background job.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum JobStatus {
30    /// Job is currently running.
31    Running,
32    /// Job was stopped by a signal (e.g., Ctrl-Z / SIGTSTP).
33    Stopped,
34    /// Job completed successfully.
35    Done,
36    /// Job failed with an error.
37    Failed,
38}
39
40impl std::fmt::Display for JobStatus {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            JobStatus::Running => write!(f, "Running"),
44            JobStatus::Stopped => write!(f, "Stopped"),
45            JobStatus::Done => write!(f, "Done"),
46            JobStatus::Failed => write!(f, "Failed"),
47        }
48    }
49}
50
51/// Information about a job for listing.
52#[derive(Debug, Clone)]
53pub struct JobInfo {
54    /// Job ID.
55    pub id: JobId,
56    /// Command description.
57    pub command: String,
58    /// Current status.
59    pub status: JobStatus,
60    /// Path to output file (if available).
61    pub output_file: Option<PathBuf>,
62    /// OS process ID (if this is a stopped/foreground process).
63    pub pid: Option<u32>,
64}
65
66/// A background job.
67pub struct Job {
68    /// Job ID.
69    pub id: JobId,
70    /// Command description.
71    pub command: String,
72    /// Task handle (None if already awaited).
73    handle: Option<JoinHandle<ExecResult>>,
74    /// Channel to receive result (alternative to handle).
75    result_rx: Option<oneshot::Receiver<ExecResult>>,
76    /// Cached result after completion.
77    result: Option<ExecResult>,
78    /// Path to output file (captures stdout/stderr after completion).
79    output_file: Option<PathBuf>,
80    /// Live stdout stream (bounded ring buffer).
81    stdout_stream: Option<Arc<BoundedStream>>,
82    /// Live stderr stream (bounded ring buffer).
83    stderr_stream: Option<Arc<BoundedStream>>,
84    /// OS process ID (for stopped jobs).
85    pid: Option<u32>,
86    /// OS process group ID (for stopped jobs).
87    pgid: Option<u32>,
88    /// Whether this job is stopped (SIGTSTP).
89    stopped: bool,
90}
91
92impl Job {
93    /// Create a new job from a task handle.
94    pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
95        Self {
96            id,
97            command,
98            handle: Some(handle),
99            result_rx: None,
100            result: None,
101            output_file: None,
102            stdout_stream: None,
103            stderr_stream: None,
104            pid: None,
105            pgid: None,
106            stopped: false,
107        }
108    }
109
110    /// Create a new job from a result channel.
111    pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
112        Self {
113            id,
114            command,
115            handle: None,
116            result_rx: Some(rx),
117            result: None,
118            output_file: None,
119            stdout_stream: None,
120            stderr_stream: None,
121            pid: None,
122            pgid: None,
123            stopped: false,
124        }
125    }
126
127    /// Create a new job with attached output streams.
128    ///
129    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
130    pub fn with_streams(
131        id: JobId,
132        command: String,
133        rx: oneshot::Receiver<ExecResult>,
134        stdout: Arc<BoundedStream>,
135        stderr: Arc<BoundedStream>,
136    ) -> Self {
137        Self {
138            id,
139            command,
140            handle: None,
141            result_rx: Some(rx),
142            result: None,
143            output_file: None,
144            stdout_stream: Some(stdout),
145            stderr_stream: Some(stderr),
146            pid: None,
147            pgid: None,
148            stopped: false,
149        }
150    }
151
152    /// Create a stopped job (from Ctrl-Z on a foreground process).
153    pub fn stopped(id: JobId, command: String, pid: u32, pgid: u32) -> Self {
154        Self {
155            id,
156            command,
157            handle: None,
158            result_rx: None,
159            result: None,
160            output_file: None,
161            stdout_stream: None,
162            stderr_stream: None,
163            pid: Some(pid),
164            pgid: Some(pgid),
165            stopped: true,
166        }
167    }
168
169    /// Get the output file path (if available).
170    pub fn output_file(&self) -> Option<&PathBuf> {
171        self.output_file.as_ref()
172    }
173
174    /// Check if the job has completed.
175    ///
176    /// Stopped jobs are not considered done.
177    pub fn is_done(&mut self) -> bool {
178        if self.stopped {
179            return false;
180        }
181        self.try_poll();
182        self.result.is_some()
183    }
184
185    /// Get the job's status.
186    pub fn status(&mut self) -> JobStatus {
187        if self.stopped {
188            return JobStatus::Stopped;
189        }
190        self.try_poll();
191        match &self.result {
192            Some(r) if r.ok() => JobStatus::Done,
193            Some(_) => JobStatus::Failed,
194            None => JobStatus::Running,
195        }
196    }
197
198    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
199    ///
200    /// Returns:
201    /// - `"running"` if the job is still running
202    /// - `"done:0"` if the job completed successfully
203    /// - `"failed:{code}"` if the job failed with an exit code
204    pub fn status_string(&mut self) -> String {
205        self.try_poll();
206        match &self.result {
207            Some(r) if r.ok() => "done:0".to_string(),
208            Some(r) => format!("failed:{}", r.code),
209            None => "running".to_string(),
210        }
211    }
212
213    /// Get the stdout stream (if attached).
214    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
215        self.stdout_stream.as_ref()
216    }
217
218    /// Get the stderr stream (if attached).
219    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
220        self.stderr_stream.as_ref()
221    }
222
223    /// Wait for the job to complete and return its result.
224    ///
225    /// On completion, the job's output is written to a temp file for later retrieval.
226    pub async fn wait(&mut self) -> ExecResult {
227        if let Some(result) = self.result.take() {
228            self.result = Some(result.clone());
229            return result;
230        }
231
232        let result = if let Some(handle) = self.handle.take() {
233            match handle.await {
234                Ok(r) => r,
235                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
236            }
237        } else if let Some(rx) = self.result_rx.take() {
238            match rx.await {
239                Ok(r) => r,
240                Err(_) => ExecResult::failure(1, "job channel closed"),
241            }
242        } else {
243            // Already waited
244            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
245        };
246
247        // Write output to temp file for later retrieval
248        if self.output_file.is_none()
249            && let Some(path) = self.write_output_file(&result) {
250                self.output_file = Some(path);
251            }
252
253        self.result = Some(result.clone());
254        result
255    }
256
257    /// Write job output to a temp file.
258    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
259        // Only write if there's output to capture
260        if result.out.is_empty() && result.err.is_empty() {
261            return None;
262        }
263
264        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
265        if std::fs::create_dir_all(&tmp_dir).is_err() {
266            tracing::warn!("Failed to create job output directory");
267            return None;
268        }
269
270        let filename = format!("job_{}.txt", self.id.0);
271        let path = tmp_dir.join(filename);
272
273        let mut content = String::new();
274        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
275        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
276
277        if !result.out.is_empty() {
278            content.push_str("## STDOUT\n");
279            content.push_str(&result.out);
280            if !result.out.ends_with('\n') {
281                content.push('\n');
282            }
283        }
284
285        if !result.err.is_empty() {
286            content.push_str("\n## STDERR\n");
287            content.push_str(&result.err);
288            if !result.err.ends_with('\n') {
289                content.push('\n');
290            }
291        }
292
293        match std::fs::write(&path, content) {
294            Ok(()) => Some(path),
295            Err(e) => {
296                tracing::warn!("Failed to write job output file: {}", e);
297                None
298            }
299        }
300    }
301
302    /// Remove any temp files associated with this job.
303    pub fn cleanup_files(&mut self) {
304        if let Some(path) = self.output_file.take() {
305            if let Err(e) = std::fs::remove_file(&path) {
306                // Ignore "not found" — file may not have been written
307                if e.kind() != io::ErrorKind::NotFound {
308                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
309                }
310            }
311        }
312    }
313
314    /// Get the result if completed, without waiting.
315    pub fn try_result(&self) -> Option<&ExecResult> {
316        self.result.as_ref()
317    }
318
319    /// Try to poll the result channel and update status.
320    ///
321    /// This is a non-blocking check that updates `self.result` if the
322    /// job has completed. Returns true if the job is now done.
323    pub fn try_poll(&mut self) -> bool {
324        if self.result.is_some() {
325            return true;
326        }
327
328        // Try to poll the oneshot channel
329        if let Some(rx) = self.result_rx.as_mut() {
330            match rx.try_recv() {
331                Ok(result) => {
332                    self.result = Some(result);
333                    self.result_rx = None;
334                    return true;
335                }
336                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
337                    // Still running
338                    return false;
339                }
340                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
341                    // Channel closed without result - job failed
342                    self.result = Some(ExecResult::failure(1, "job channel closed"));
343                    self.result_rx = None;
344                    return true;
345                }
346            }
347        }
348
349        // Check if handle is finished
350        if let Some(handle) = self.handle.as_mut()
351            && handle.is_finished() {
352                // Take the handle and wait for it (should be instant)
353                let Some(handle) = self.handle.take() else {
354                    return false;
355                };
356                // We can't await here, so we use now_or_never
357                // Note: this is synchronous since is_finished() was true
358                let result = match tokio::task::block_in_place(|| {
359                    tokio::runtime::Handle::current().block_on(handle)
360                }) {
361                    Ok(r) => r,
362                    Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
363                };
364                self.result = Some(result);
365                return true;
366            }
367
368        false
369    }
370}
371
372/// Manager for background jobs.
373pub struct JobManager {
374    /// Counter for generating unique job IDs.
375    next_id: AtomicU64,
376    /// Map of job ID to job.
377    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
378}
379
380impl JobManager {
381    /// Create a new job manager.
382    pub fn new() -> Self {
383        Self {
384            next_id: AtomicU64::new(1),
385            jobs: Arc::new(Mutex::new(HashMap::new())),
386        }
387    }
388
389    /// Spawn a new background job from a future.
390    pub fn spawn<F>(&self, command: String, future: F) -> JobId
391    where
392        F: std::future::Future<Output = ExecResult> + Send + 'static,
393    {
394        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
395        let handle = tokio::spawn(future);
396        let job = Job::new(id, command, handle);
397
398        // Try synchronous insert first (succeeds if lock is uncontended)
399        let jobs = self.jobs.clone();
400        if let Ok(mut guard) = jobs.try_lock() {
401            guard.insert(id, job);
402        } else {
403            tokio::spawn(async move {
404                let mut jobs = jobs.lock().await;
405                jobs.insert(id, job);
406            });
407        }
408
409        id
410    }
411
412    /// Spawn a job that's already running and communicate via channel.
413    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
414        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
415        let job = Job::from_channel(id, command, rx);
416
417        let mut jobs = self.jobs.lock().await;
418        jobs.insert(id, job);
419
420        id
421    }
422
423    /// Register a job with attached output streams.
424    ///
425    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
426    pub async fn register_with_streams(
427        &self,
428        command: String,
429        rx: oneshot::Receiver<ExecResult>,
430        stdout: Arc<BoundedStream>,
431        stderr: Arc<BoundedStream>,
432    ) -> JobId {
433        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
434        let job = Job::with_streams(id, command, rx, stdout, stderr);
435
436        let mut jobs = self.jobs.lock().await;
437        jobs.insert(id, job);
438
439        id
440    }
441
442    /// Wait for a specific job to complete.
443    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
444        let mut jobs = self.jobs.lock().await;
445        if let Some(job) = jobs.get_mut(&id) {
446            Some(job.wait().await)
447        } else {
448            None
449        }
450    }
451
452    /// Wait for all jobs to complete, returning results in completion order.
453    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
454        let mut results = Vec::new();
455
456        // Get all job IDs
457        let ids: Vec<JobId> = {
458            let jobs = self.jobs.lock().await;
459            jobs.keys().copied().collect()
460        };
461
462        for id in ids {
463            if let Some(result) = self.wait(id).await {
464                results.push((id, result));
465            }
466        }
467
468        results
469    }
470
471    /// List all jobs with their status.
472    pub async fn list(&self) -> Vec<JobInfo> {
473        let mut jobs = self.jobs.lock().await;
474        jobs.values_mut()
475            .map(|job| JobInfo {
476                id: job.id,
477                command: job.command.clone(),
478                status: job.status(),
479                output_file: job.output_file.clone(),
480                pid: job.pid,
481            })
482            .collect()
483    }
484
485    /// Get the number of running jobs.
486    pub async fn running_count(&self) -> usize {
487        let mut jobs = self.jobs.lock().await;
488        let mut count = 0;
489        for job in jobs.values_mut() {
490            if !job.is_done() {
491                count += 1;
492            }
493        }
494        count
495    }
496
497    /// Remove completed jobs from tracking and clean up their temp files.
498    pub async fn cleanup(&self) {
499        let mut jobs = self.jobs.lock().await;
500        jobs.retain(|_, job| {
501            if job.is_done() {
502                job.cleanup_files();
503                false
504            } else {
505                true
506            }
507        });
508    }
509
510    /// Check if a specific job exists.
511    pub async fn exists(&self, id: JobId) -> bool {
512        let jobs = self.jobs.lock().await;
513        jobs.contains_key(&id)
514    }
515
516    /// Get info for a specific job.
517    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
518        let mut jobs = self.jobs.lock().await;
519        jobs.get_mut(&id).map(|job| JobInfo {
520            id: job.id,
521            command: job.command.clone(),
522            status: job.status(),
523            output_file: job.output_file.clone(),
524            pid: job.pid,
525        })
526    }
527
528    /// Get the command string for a job.
529    pub async fn get_command(&self, id: JobId) -> Option<String> {
530        let jobs = self.jobs.lock().await;
531        jobs.get(&id).map(|job| job.command.clone())
532    }
533
534    /// Get the status string for a job (for /v/jobs/{id}/status).
535    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
536        let mut jobs = self.jobs.lock().await;
537        jobs.get_mut(&id).map(|job| job.status_string())
538    }
539
540    /// Read stdout stream content for a job.
541    ///
542    /// Returns `None` if the job doesn't exist or has no attached stream.
543    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
544        let jobs = self.jobs.lock().await;
545        if let Some(job) = jobs.get(&id)
546            && let Some(stream) = job.stdout_stream() {
547                return Some(stream.read().await);
548            }
549        None
550    }
551
552    /// Read stderr stream content for a job.
553    ///
554    /// Returns `None` if the job doesn't exist or has no attached stream.
555    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
556        let jobs = self.jobs.lock().await;
557        if let Some(job) = jobs.get(&id)
558            && let Some(stream) = job.stderr_stream() {
559                return Some(stream.read().await);
560            }
561        None
562    }
563
564    /// List all job IDs.
565    pub async fn list_ids(&self) -> Vec<JobId> {
566        let jobs = self.jobs.lock().await;
567        jobs.keys().copied().collect()
568    }
569
570    /// Register a stopped job (from Ctrl-Z on a foreground process).
571    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
572        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
573        let job = Job::stopped(id, command, pid, pgid);
574        let mut jobs = self.jobs.lock().await;
575        jobs.insert(id, job);
576        id
577    }
578
579    /// Mark a job as stopped with its process info.
580    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
581        let mut jobs = self.jobs.lock().await;
582        if let Some(job) = jobs.get_mut(&id) {
583            job.stopped = true;
584            job.pid = Some(pid);
585            job.pgid = Some(pgid);
586        }
587    }
588
589    /// Mark a stopped job as resumed.
590    pub async fn resume_job(&self, id: JobId) {
591        let mut jobs = self.jobs.lock().await;
592        if let Some(job) = jobs.get_mut(&id) {
593            job.stopped = false;
594        }
595    }
596
597    /// Get the most recently stopped job.
598    pub async fn last_stopped(&self) -> Option<JobId> {
599        let mut jobs = self.jobs.lock().await;
600        // Find the highest-numbered stopped job
601        let mut best: Option<JobId> = None;
602        for job in jobs.values_mut() {
603            if job.stopped {
604                match best {
605                    None => best = Some(job.id),
606                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
607                    _ => {}
608                }
609            }
610        }
611        best
612    }
613
614    /// Get process info (pid, pgid) for a job.
615    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
616        let jobs = self.jobs.lock().await;
617        jobs.get(&id).and_then(|job| {
618            match (job.pid, job.pgid) {
619                (Some(pid), Some(pgid)) => Some((pid, pgid)),
620                _ => None,
621            }
622        })
623    }
624
625    /// Remove a job from tracking.
626    pub async fn remove(&self, id: JobId) {
627        let mut jobs = self.jobs.lock().await;
628        if let Some(mut job) = jobs.remove(&id) {
629            job.cleanup_files();
630        }
631    }
632}
633
634impl Default for JobManager {
635    fn default() -> Self {
636        Self::new()
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use std::time::Duration;
644
645    #[tokio::test]
646    async fn test_spawn_and_wait() {
647        let manager = JobManager::new();
648
649        let id = manager.spawn("test".to_string(), async {
650            tokio::time::sleep(Duration::from_millis(10)).await;
651            ExecResult::success("done")
652        });
653
654        // Wait a bit for the job to be registered
655        tokio::time::sleep(Duration::from_millis(5)).await;
656
657        let result = manager.wait(id).await;
658        assert!(result.is_some());
659        let result = result.unwrap();
660        assert!(result.ok());
661        assert_eq!(result.out, "done");
662    }
663
664    #[tokio::test]
665    async fn test_wait_all() {
666        let manager = JobManager::new();
667
668        manager.spawn("job1".to_string(), async {
669            tokio::time::sleep(Duration::from_millis(10)).await;
670            ExecResult::success("one")
671        });
672
673        manager.spawn("job2".to_string(), async {
674            tokio::time::sleep(Duration::from_millis(5)).await;
675            ExecResult::success("two")
676        });
677
678        // Wait for jobs to register
679        tokio::time::sleep(Duration::from_millis(5)).await;
680
681        let results = manager.wait_all().await;
682        assert_eq!(results.len(), 2);
683    }
684
685    #[tokio::test]
686    async fn test_list_jobs() {
687        let manager = JobManager::new();
688
689        manager.spawn("test job".to_string(), async {
690            tokio::time::sleep(Duration::from_millis(50)).await;
691            ExecResult::success("")
692        });
693
694        // Wait for job to register
695        tokio::time::sleep(Duration::from_millis(5)).await;
696
697        let jobs = manager.list().await;
698        assert_eq!(jobs.len(), 1);
699        assert_eq!(jobs[0].command, "test job");
700        assert_eq!(jobs[0].status, JobStatus::Running);
701    }
702
703    #[tokio::test]
704    async fn test_job_status_after_completion() {
705        let manager = JobManager::new();
706
707        let id = manager.spawn("quick".to_string(), async {
708            ExecResult::success("")
709        });
710
711        // Wait for job to complete
712        tokio::time::sleep(Duration::from_millis(10)).await;
713        let _ = manager.wait(id).await;
714
715        let info = manager.get(id).await;
716        assert!(info.is_some());
717        assert_eq!(info.unwrap().status, JobStatus::Done);
718    }
719
720    #[tokio::test]
721    async fn test_cleanup() {
722        let manager = JobManager::new();
723
724        let id = manager.spawn("done".to_string(), async {
725            ExecResult::success("")
726        });
727
728        // Wait for completion
729        tokio::time::sleep(Duration::from_millis(10)).await;
730        let _ = manager.wait(id).await;
731
732        // Should have 1 job
733        assert_eq!(manager.list().await.len(), 1);
734
735        // Cleanup
736        manager.cleanup().await;
737
738        // Should have 0 jobs
739        assert_eq!(manager.list().await.len(), 0);
740    }
741
742    #[tokio::test]
743    async fn test_cleanup_removes_temp_files() {
744        // Bug K: cleanup should remove temp files
745        let manager = JobManager::new();
746
747        let id = manager.spawn("output job".to_string(), async {
748            ExecResult::success("some output that gets written to a temp file")
749        });
750
751        // Wait for completion (triggers output file creation)
752        tokio::time::sleep(Duration::from_millis(10)).await;
753        let result = manager.wait(id).await;
754        assert!(result.is_some());
755
756        // Get the output file path before cleanup
757        let output_file = {
758            let jobs = manager.jobs.lock().await;
759            jobs.get(&id).and_then(|j| j.output_file().cloned())
760        };
761
762        // Cleanup should remove the job and its files
763        manager.cleanup().await;
764
765        // If an output file was created, it should be gone now
766        if let Some(path) = output_file {
767            assert!(
768                !path.exists(),
769                "temp file should be removed after cleanup: {}",
770                path.display()
771            );
772        }
773    }
774
775    #[tokio::test]
776    async fn test_register_with_channel() {
777        let manager = JobManager::new();
778        let (tx, rx) = oneshot::channel();
779
780        let id = manager.register("channel job".to_string(), rx).await;
781
782        // Send result
783        tx.send(ExecResult::success("from channel")).unwrap();
784
785        let result = manager.wait(id).await;
786        assert!(result.is_some());
787        assert_eq!(result.unwrap().out, "from channel");
788    }
789
790    #[tokio::test]
791    async fn test_spawn_immediately_available() {
792        // Bug J: job should be queryable immediately after spawn()
793        let manager = JobManager::new();
794
795        let id = manager.spawn("instant".to_string(), async {
796            tokio::time::sleep(Duration::from_millis(100)).await;
797            ExecResult::success("done")
798        });
799
800        // Should be immediately visible without any sleep
801        let exists = manager.exists(id).await;
802        assert!(exists, "job should be immediately available after spawn()");
803
804        let info = manager.get(id).await;
805        assert!(info.is_some(), "job info should be available immediately");
806    }
807
808    #[tokio::test]
809    async fn test_nonexistent_job() {
810        let manager = JobManager::new();
811        let result = manager.wait(JobId(999)).await;
812        assert!(result.is_none());
813    }
814}