Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18// Data types re-exported from kaish-types.
19pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21/// A background job.
22pub struct Job {
23    /// Job ID.
24    pub id: JobId,
25    /// Command description.
26    pub command: String,
27    /// Task handle (None if already awaited).
28    handle: Option<JoinHandle<ExecResult>>,
29    /// Channel to receive result (alternative to handle).
30    result_rx: Option<oneshot::Receiver<ExecResult>>,
31    /// Cached result after completion.
32    result: Option<ExecResult>,
33    /// Path to output file (captures stdout/stderr after completion).
34    output_file: Option<PathBuf>,
35    /// Live stdout stream (bounded ring buffer).
36    stdout_stream: Option<Arc<BoundedStream>>,
37    /// Live stderr stream (bounded ring buffer).
38    stderr_stream: Option<Arc<BoundedStream>>,
39    /// OS process ID (for stopped jobs).
40    pid: Option<u32>,
41    /// OS process group ID (for stopped jobs).
42    pgid: Option<u32>,
43    /// Whether this job is stopped (SIGTSTP).
44    stopped: bool,
45}
46
47impl Job {
48    /// Create a new job from a task handle.
49    pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
50        Self {
51            id,
52            command,
53            handle: Some(handle),
54            result_rx: None,
55            result: None,
56            output_file: None,
57            stdout_stream: None,
58            stderr_stream: None,
59            pid: None,
60            pgid: None,
61            stopped: false,
62        }
63    }
64
65    /// Create a new job from a result channel.
66    pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
67        Self {
68            id,
69            command,
70            handle: None,
71            result_rx: Some(rx),
72            result: None,
73            output_file: None,
74            stdout_stream: None,
75            stderr_stream: None,
76            pid: None,
77            pgid: None,
78            stopped: false,
79        }
80    }
81
82    /// Create a new job with attached output streams.
83    ///
84    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
85    pub fn with_streams(
86        id: JobId,
87        command: String,
88        rx: oneshot::Receiver<ExecResult>,
89        stdout: Arc<BoundedStream>,
90        stderr: Arc<BoundedStream>,
91    ) -> Self {
92        Self {
93            id,
94            command,
95            handle: None,
96            result_rx: Some(rx),
97            result: None,
98            output_file: None,
99            stdout_stream: Some(stdout),
100            stderr_stream: Some(stderr),
101            pid: None,
102            pgid: None,
103            stopped: false,
104        }
105    }
106
107    /// Create a stopped job (from Ctrl-Z on a foreground process).
108    pub fn stopped(id: JobId, command: String, pid: u32, pgid: u32) -> Self {
109        Self {
110            id,
111            command,
112            handle: None,
113            result_rx: None,
114            result: None,
115            output_file: None,
116            stdout_stream: None,
117            stderr_stream: None,
118            pid: Some(pid),
119            pgid: Some(pgid),
120            stopped: true,
121        }
122    }
123
124    /// Get the output file path (if available).
125    pub fn output_file(&self) -> Option<&PathBuf> {
126        self.output_file.as_ref()
127    }
128
129    /// Check if the job has completed.
130    ///
131    /// Stopped jobs are not considered done.
132    pub fn is_done(&mut self) -> bool {
133        if self.stopped {
134            return false;
135        }
136        self.try_poll();
137        self.result.is_some()
138    }
139
140    /// Get the job's status.
141    pub fn status(&mut self) -> JobStatus {
142        if self.stopped {
143            return JobStatus::Stopped;
144        }
145        self.try_poll();
146        match &self.result {
147            Some(r) if r.ok() => JobStatus::Done,
148            Some(_) => JobStatus::Failed,
149            None => JobStatus::Running,
150        }
151    }
152
153    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
154    ///
155    /// Returns:
156    /// - `"running"` if the job is still running
157    /// - `"done:0"` if the job completed successfully
158    /// - `"failed:{code}"` if the job failed with an exit code
159    pub fn status_string(&mut self) -> String {
160        self.try_poll();
161        match &self.result {
162            Some(r) if r.ok() => "done:0".to_string(),
163            Some(r) => format!("failed:{}", r.code),
164            None => "running".to_string(),
165        }
166    }
167
168    /// Get the stdout stream (if attached).
169    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
170        self.stdout_stream.as_ref()
171    }
172
173    /// Get the stderr stream (if attached).
174    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
175        self.stderr_stream.as_ref()
176    }
177
178    /// Wait for the job to complete and return its result.
179    ///
180    /// On completion, the job's output is written to a temp file for later retrieval.
181    pub async fn wait(&mut self) -> ExecResult {
182        if let Some(result) = self.result.take() {
183            self.result = Some(result.clone());
184            return result;
185        }
186
187        let result = if let Some(handle) = self.handle.take() {
188            match handle.await {
189                Ok(r) => r,
190                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
191            }
192        } else if let Some(rx) = self.result_rx.take() {
193            match rx.await {
194                Ok(r) => r,
195                Err(_) => ExecResult::failure(1, "job channel closed"),
196            }
197        } else {
198            // Already waited
199            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
200        };
201
202        // Write output to temp file for later retrieval
203        if self.output_file.is_none()
204            && let Some(path) = self.write_output_file(&result) {
205                self.output_file = Some(path);
206            }
207
208        self.result = Some(result.clone());
209        result
210    }
211
212    /// Write job output to a temp file.
213    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
214        // Only write if there's output to capture
215        if result.out.is_empty() && result.err.is_empty() {
216            return None;
217        }
218
219        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
220        if std::fs::create_dir_all(&tmp_dir).is_err() {
221            tracing::warn!("Failed to create job output directory");
222            return None;
223        }
224
225        let filename = format!("job_{}.txt", self.id.0);
226        let path = tmp_dir.join(filename);
227
228        let mut content = String::new();
229        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
230        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
231
232        if !result.out.is_empty() {
233            content.push_str("## STDOUT\n");
234            content.push_str(&result.out);
235            if !result.out.ends_with('\n') {
236                content.push('\n');
237            }
238        }
239
240        if !result.err.is_empty() {
241            content.push_str("\n## STDERR\n");
242            content.push_str(&result.err);
243            if !result.err.ends_with('\n') {
244                content.push('\n');
245            }
246        }
247
248        match std::fs::write(&path, content) {
249            Ok(()) => Some(path),
250            Err(e) => {
251                tracing::warn!("Failed to write job output file: {}", e);
252                None
253            }
254        }
255    }
256
257    /// Remove any temp files associated with this job.
258    pub fn cleanup_files(&mut self) {
259        if let Some(path) = self.output_file.take() {
260            if let Err(e) = std::fs::remove_file(&path) {
261                // Ignore "not found" — file may not have been written
262                if e.kind() != io::ErrorKind::NotFound {
263                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
264                }
265            }
266        }
267    }
268
269    /// Get the result if completed, without waiting.
270    pub fn try_result(&self) -> Option<&ExecResult> {
271        self.result.as_ref()
272    }
273
274    /// Try to poll the result channel and update status.
275    ///
276    /// This is a non-blocking check that updates `self.result` if the
277    /// job has completed. Returns true if the job is now done.
278    pub fn try_poll(&mut self) -> bool {
279        if self.result.is_some() {
280            return true;
281        }
282
283        // Try to poll the oneshot channel
284        if let Some(rx) = self.result_rx.as_mut() {
285            match rx.try_recv() {
286                Ok(result) => {
287                    self.result = Some(result);
288                    self.result_rx = None;
289                    return true;
290                }
291                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
292                    // Still running
293                    return false;
294                }
295                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
296                    // Channel closed without result - job failed
297                    self.result = Some(ExecResult::failure(1, "job channel closed"));
298                    self.result_rx = None;
299                    return true;
300                }
301            }
302        }
303
304        // Check if handle is finished
305        if let Some(handle) = self.handle.as_mut()
306            && handle.is_finished() {
307                // Take the handle and wait for it (should be instant)
308                let Some(mut handle) = self.handle.take() else {
309                    return false;
310                };
311                // Poll directly with a noop waker — safe because is_finished() was true
312                let waker = std::task::Waker::noop();
313                let mut cx = std::task::Context::from_waker(waker);
314                let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
315                    std::task::Poll::Ready(Ok(r)) => r,
316                    std::task::Poll::Ready(Err(e)) => {
317                        ExecResult::failure(1, format!("job panicked: {}", e))
318                    }
319                    std::task::Poll::Pending => return false, // shouldn't happen
320                };
321                self.result = Some(result);
322                return true;
323            }
324
325        false
326    }
327}
328
329/// Manager for background jobs.
330pub struct JobManager {
331    /// Counter for generating unique job IDs.
332    next_id: AtomicU64,
333    /// Map of job ID to job.
334    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
335}
336
337impl JobManager {
338    /// Create a new job manager.
339    pub fn new() -> Self {
340        Self {
341            next_id: AtomicU64::new(1),
342            jobs: Arc::new(Mutex::new(HashMap::new())),
343        }
344    }
345
346    /// Spawn a new background job from a future.
347    ///
348    /// The job is inserted into the map synchronously before returning,
349    /// guaranteeing it's immediately queryable via `exists()` or `get()`.
350    pub fn spawn<F>(&self, command: String, future: F) -> JobId
351    where
352        F: std::future::Future<Output = ExecResult> + Send + 'static,
353    {
354        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
355        let handle = tokio::spawn(future);
356        let job = Job::new(id, command, handle);
357
358        // Spin on try_lock to guarantee the job is in the map on return.
359        // The lock is tokio::sync::Mutex which is only held briefly during
360        // HashMap operations, so contention resolves quickly.
361        let mut job_opt = Some(job);
362        loop {
363            match self.jobs.try_lock() {
364                Ok(mut guard) => {
365                    if let Some(j) = job_opt.take() {
366                        guard.insert(id, j);
367                    }
368                    break;
369                }
370                Err(_) => {
371                    std::hint::spin_loop();
372                }
373            }
374        }
375
376        id
377    }
378
379    /// Spawn a job that's already running and communicate via channel.
380    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
381        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
382        let job = Job::from_channel(id, command, rx);
383
384        let mut jobs = self.jobs.lock().await;
385        jobs.insert(id, job);
386
387        id
388    }
389
390    /// Register a job with attached output streams.
391    ///
392    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
393    pub async fn register_with_streams(
394        &self,
395        command: String,
396        rx: oneshot::Receiver<ExecResult>,
397        stdout: Arc<BoundedStream>,
398        stderr: Arc<BoundedStream>,
399    ) -> JobId {
400        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
401        let job = Job::with_streams(id, command, rx, stdout, stderr);
402
403        let mut jobs = self.jobs.lock().await;
404        jobs.insert(id, job);
405
406        id
407    }
408
409    /// Wait for a specific job to complete.
410    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
411        let mut jobs = self.jobs.lock().await;
412        if let Some(job) = jobs.get_mut(&id) {
413            Some(job.wait().await)
414        } else {
415            None
416        }
417    }
418
419    /// Wait for all jobs to complete, returning results in completion order.
420    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
421        let mut results = Vec::new();
422
423        // Get all job IDs
424        let ids: Vec<JobId> = {
425            let jobs = self.jobs.lock().await;
426            jobs.keys().copied().collect()
427        };
428
429        for id in ids {
430            if let Some(result) = self.wait(id).await {
431                results.push((id, result));
432            }
433        }
434
435        results
436    }
437
438    /// List all jobs with their status.
439    pub async fn list(&self) -> Vec<JobInfo> {
440        let mut jobs = self.jobs.lock().await;
441        jobs.values_mut()
442            .map(|job| JobInfo {
443                id: job.id,
444                command: job.command.clone(),
445                status: job.status(),
446                output_file: job.output_file.clone(),
447                pid: job.pid,
448            })
449            .collect()
450    }
451
452    /// Get the number of running jobs.
453    pub async fn running_count(&self) -> usize {
454        let mut jobs = self.jobs.lock().await;
455        let mut count = 0;
456        for job in jobs.values_mut() {
457            if !job.is_done() {
458                count += 1;
459            }
460        }
461        count
462    }
463
464    /// Remove completed jobs from tracking and clean up their temp files.
465    pub async fn cleanup(&self) {
466        let mut jobs = self.jobs.lock().await;
467        jobs.retain(|_, job| {
468            if job.is_done() {
469                job.cleanup_files();
470                false
471            } else {
472                true
473            }
474        });
475    }
476
477    /// Check if a specific job exists.
478    pub async fn exists(&self, id: JobId) -> bool {
479        let jobs = self.jobs.lock().await;
480        jobs.contains_key(&id)
481    }
482
483    /// Get info for a specific job.
484    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
485        let mut jobs = self.jobs.lock().await;
486        jobs.get_mut(&id).map(|job| JobInfo {
487            id: job.id,
488            command: job.command.clone(),
489            status: job.status(),
490            output_file: job.output_file.clone(),
491            pid: job.pid,
492        })
493    }
494
495    /// Get the command string for a job.
496    pub async fn get_command(&self, id: JobId) -> Option<String> {
497        let jobs = self.jobs.lock().await;
498        jobs.get(&id).map(|job| job.command.clone())
499    }
500
501    /// Get the status string for a job (for /v/jobs/{id}/status).
502    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
503        let mut jobs = self.jobs.lock().await;
504        jobs.get_mut(&id).map(|job| job.status_string())
505    }
506
507    /// Read stdout stream content for a job.
508    ///
509    /// Returns `None` if the job doesn't exist or has no attached stream.
510    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
511        let jobs = self.jobs.lock().await;
512        if let Some(job) = jobs.get(&id)
513            && let Some(stream) = job.stdout_stream() {
514                return Some(stream.read().await);
515            }
516        None
517    }
518
519    /// Read stderr stream content for a job.
520    ///
521    /// Returns `None` if the job doesn't exist or has no attached stream.
522    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
523        let jobs = self.jobs.lock().await;
524        if let Some(job) = jobs.get(&id)
525            && let Some(stream) = job.stderr_stream() {
526                return Some(stream.read().await);
527            }
528        None
529    }
530
531    /// List all job IDs.
532    pub async fn list_ids(&self) -> Vec<JobId> {
533        let jobs = self.jobs.lock().await;
534        jobs.keys().copied().collect()
535    }
536
537    /// Register a stopped job (from Ctrl-Z on a foreground process).
538    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
539        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
540        let job = Job::stopped(id, command, pid, pgid);
541        let mut jobs = self.jobs.lock().await;
542        jobs.insert(id, job);
543        id
544    }
545
546    /// Mark a job as stopped with its process info.
547    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
548        let mut jobs = self.jobs.lock().await;
549        if let Some(job) = jobs.get_mut(&id) {
550            job.stopped = true;
551            job.pid = Some(pid);
552            job.pgid = Some(pgid);
553        }
554    }
555
556    /// Mark a stopped job as resumed.
557    pub async fn resume_job(&self, id: JobId) {
558        let mut jobs = self.jobs.lock().await;
559        if let Some(job) = jobs.get_mut(&id) {
560            job.stopped = false;
561        }
562    }
563
564    /// Get the most recently stopped job.
565    pub async fn last_stopped(&self) -> Option<JobId> {
566        let mut jobs = self.jobs.lock().await;
567        // Find the highest-numbered stopped job
568        let mut best: Option<JobId> = None;
569        for job in jobs.values_mut() {
570            if job.stopped {
571                match best {
572                    None => best = Some(job.id),
573                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
574                    _ => {}
575                }
576            }
577        }
578        best
579    }
580
581    /// Get process info (pid, pgid) for a job.
582    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
583        let jobs = self.jobs.lock().await;
584        jobs.get(&id).and_then(|job| {
585            match (job.pid, job.pgid) {
586                (Some(pid), Some(pgid)) => Some((pid, pgid)),
587                _ => None,
588            }
589        })
590    }
591
592    /// Remove a job from tracking.
593    pub async fn remove(&self, id: JobId) {
594        let mut jobs = self.jobs.lock().await;
595        if let Some(mut job) = jobs.remove(&id) {
596            job.cleanup_files();
597        }
598    }
599}
600
601impl Default for JobManager {
602    fn default() -> Self {
603        Self::new()
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use std::time::Duration;
611
612    #[tokio::test]
613    async fn test_spawn_and_wait() {
614        let manager = JobManager::new();
615
616        let id = manager.spawn("test".to_string(), async {
617            tokio::time::sleep(Duration::from_millis(10)).await;
618            ExecResult::success("done")
619        });
620
621        // Wait a bit for the job to be registered
622        tokio::time::sleep(Duration::from_millis(5)).await;
623
624        let result = manager.wait(id).await;
625        assert!(result.is_some());
626        let result = result.unwrap();
627        assert!(result.ok());
628        assert_eq!(result.out, "done");
629    }
630
631    #[tokio::test]
632    async fn test_wait_all() {
633        let manager = JobManager::new();
634
635        manager.spawn("job1".to_string(), async {
636            tokio::time::sleep(Duration::from_millis(10)).await;
637            ExecResult::success("one")
638        });
639
640        manager.spawn("job2".to_string(), async {
641            tokio::time::sleep(Duration::from_millis(5)).await;
642            ExecResult::success("two")
643        });
644
645        // Wait for jobs to register
646        tokio::time::sleep(Duration::from_millis(5)).await;
647
648        let results = manager.wait_all().await;
649        assert_eq!(results.len(), 2);
650    }
651
652    #[tokio::test]
653    async fn test_list_jobs() {
654        let manager = JobManager::new();
655
656        manager.spawn("test job".to_string(), async {
657            tokio::time::sleep(Duration::from_millis(50)).await;
658            ExecResult::success("")
659        });
660
661        // Wait for job to register
662        tokio::time::sleep(Duration::from_millis(5)).await;
663
664        let jobs = manager.list().await;
665        assert_eq!(jobs.len(), 1);
666        assert_eq!(jobs[0].command, "test job");
667        assert_eq!(jobs[0].status, JobStatus::Running);
668    }
669
670    #[tokio::test]
671    async fn test_job_status_after_completion() {
672        let manager = JobManager::new();
673
674        let id = manager.spawn("quick".to_string(), async {
675            ExecResult::success("")
676        });
677
678        // Wait for job to complete
679        tokio::time::sleep(Duration::from_millis(10)).await;
680        let _ = manager.wait(id).await;
681
682        let info = manager.get(id).await;
683        assert!(info.is_some());
684        assert_eq!(info.unwrap().status, JobStatus::Done);
685    }
686
687    #[tokio::test]
688    async fn test_cleanup() {
689        let manager = JobManager::new();
690
691        let id = manager.spawn("done".to_string(), async {
692            ExecResult::success("")
693        });
694
695        // Wait for completion
696        tokio::time::sleep(Duration::from_millis(10)).await;
697        let _ = manager.wait(id).await;
698
699        // Should have 1 job
700        assert_eq!(manager.list().await.len(), 1);
701
702        // Cleanup
703        manager.cleanup().await;
704
705        // Should have 0 jobs
706        assert_eq!(manager.list().await.len(), 0);
707    }
708
709    #[tokio::test]
710    async fn test_cleanup_removes_temp_files() {
711        // Bug K: cleanup should remove temp files
712        let manager = JobManager::new();
713
714        let id = manager.spawn("output job".to_string(), async {
715            ExecResult::success("some output that gets written to a temp file")
716        });
717
718        // Wait for completion (triggers output file creation)
719        tokio::time::sleep(Duration::from_millis(10)).await;
720        let result = manager.wait(id).await;
721        assert!(result.is_some());
722
723        // Get the output file path before cleanup
724        let output_file = {
725            let jobs = manager.jobs.lock().await;
726            jobs.get(&id).and_then(|j| j.output_file().cloned())
727        };
728
729        // Cleanup should remove the job and its files
730        manager.cleanup().await;
731
732        // If an output file was created, it should be gone now
733        if let Some(path) = output_file {
734            assert!(
735                !path.exists(),
736                "temp file should be removed after cleanup: {}",
737                path.display()
738            );
739        }
740    }
741
742    #[tokio::test]
743    async fn test_register_with_channel() {
744        let manager = JobManager::new();
745        let (tx, rx) = oneshot::channel();
746
747        let id = manager.register("channel job".to_string(), rx).await;
748
749        // Send result
750        tx.send(ExecResult::success("from channel")).unwrap();
751
752        let result = manager.wait(id).await;
753        assert!(result.is_some());
754        assert_eq!(result.unwrap().out, "from channel");
755    }
756
757    #[tokio::test]
758    async fn test_spawn_immediately_available() {
759        // Bug J: job should be queryable immediately after spawn()
760        let manager = JobManager::new();
761
762        let id = manager.spawn("instant".to_string(), async {
763            tokio::time::sleep(Duration::from_millis(100)).await;
764            ExecResult::success("done")
765        });
766
767        // Should be immediately visible without any sleep
768        let exists = manager.exists(id).await;
769        assert!(exists, "job should be immediately available after spawn()");
770
771        let info = manager.get(id).await;
772        assert!(info.is_some(), "job info should be available immediately");
773    }
774
775    #[tokio::test]
776    async fn test_nonexistent_job() {
777        let manager = JobManager::new();
778        let result = manager.wait(JobId(999)).await;
779        assert!(result.is_none());
780    }
781}