Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18// Data types re-exported from kaish-types.
19pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21/// A background job.
22pub struct Job {
23    /// Job ID.
24    pub id: JobId,
25    /// Command description.
26    pub command: String,
27    /// Task handle (None if already awaited).
28    handle: Option<JoinHandle<ExecResult>>,
29    /// Channel to receive result (alternative to handle).
30    result_rx: Option<oneshot::Receiver<ExecResult>>,
31    /// Cached result after completion.
32    result: Option<ExecResult>,
33    /// Path to output file (captures stdout/stderr after completion).
34    output_file: Option<PathBuf>,
35    /// Live stdout stream (bounded ring buffer).
36    stdout_stream: Option<Arc<BoundedStream>>,
37    /// Live stderr stream (bounded ring buffer).
38    stderr_stream: Option<Arc<BoundedStream>>,
39    /// OS process ID (for stopped jobs).
40    pid: Option<u32>,
41    /// OS process group ID (for stopped jobs).
42    pgid: Option<u32>,
43    /// Whether this job is stopped (SIGTSTP).
44    stopped: bool,
45}
46
47impl Job {
48    /// Create a new job from a task handle.
49    pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
50        Self {
51            id,
52            command,
53            handle: Some(handle),
54            result_rx: None,
55            result: None,
56            output_file: None,
57            stdout_stream: None,
58            stderr_stream: None,
59            pid: None,
60            pgid: None,
61            stopped: false,
62        }
63    }
64
65    /// Create a new job from a result channel.
66    pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
67        Self {
68            id,
69            command,
70            handle: None,
71            result_rx: Some(rx),
72            result: None,
73            output_file: None,
74            stdout_stream: None,
75            stderr_stream: None,
76            pid: None,
77            pgid: None,
78            stopped: false,
79        }
80    }
81
82    /// Create a new job with attached output streams.
83    ///
84    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
85    pub fn with_streams(
86        id: JobId,
87        command: String,
88        rx: oneshot::Receiver<ExecResult>,
89        stdout: Arc<BoundedStream>,
90        stderr: Arc<BoundedStream>,
91    ) -> Self {
92        Self {
93            id,
94            command,
95            handle: None,
96            result_rx: Some(rx),
97            result: None,
98            output_file: None,
99            stdout_stream: Some(stdout),
100            stderr_stream: Some(stderr),
101            pid: None,
102            pgid: None,
103            stopped: false,
104        }
105    }
106
107    /// Create a stopped job (from Ctrl-Z on a foreground process).
108    pub fn stopped(id: JobId, command: String, pid: u32, pgid: u32) -> Self {
109        Self {
110            id,
111            command,
112            handle: None,
113            result_rx: None,
114            result: None,
115            output_file: None,
116            stdout_stream: None,
117            stderr_stream: None,
118            pid: Some(pid),
119            pgid: Some(pgid),
120            stopped: true,
121        }
122    }
123
124    /// Get the output file path (if available).
125    pub fn output_file(&self) -> Option<&PathBuf> {
126        self.output_file.as_ref()
127    }
128
129    /// Check if the job has completed.
130    ///
131    /// Stopped jobs are not considered done.
132    pub fn is_done(&mut self) -> bool {
133        if self.stopped {
134            return false;
135        }
136        self.try_poll();
137        self.result.is_some()
138    }
139
140    /// Get the job's status.
141    pub fn status(&mut self) -> JobStatus {
142        if self.stopped {
143            return JobStatus::Stopped;
144        }
145        self.try_poll();
146        match &self.result {
147            Some(r) if r.ok() => JobStatus::Done,
148            Some(_) => JobStatus::Failed,
149            None => JobStatus::Running,
150        }
151    }
152
153    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
154    ///
155    /// Returns:
156    /// - `"running"` if the job is still running
157    /// - `"done:0"` if the job completed successfully
158    /// - `"failed:{code}"` if the job failed with an exit code
159    pub fn status_string(&mut self) -> String {
160        self.try_poll();
161        match &self.result {
162            Some(r) if r.ok() => "done:0".to_string(),
163            Some(r) => format!("failed:{}", r.code),
164            None => "running".to_string(),
165        }
166    }
167
168    /// Get the stdout stream (if attached).
169    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
170        self.stdout_stream.as_ref()
171    }
172
173    /// Get the stderr stream (if attached).
174    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
175        self.stderr_stream.as_ref()
176    }
177
178    /// Wait for the job to complete and return its result.
179    ///
180    /// On completion, the job's output is written to a temp file for later retrieval.
181    pub async fn wait(&mut self) -> ExecResult {
182        if let Some(result) = self.result.take() {
183            self.result = Some(result.clone());
184            return result;
185        }
186
187        let result = if let Some(handle) = self.handle.take() {
188            match handle.await {
189                Ok(r) => r,
190                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
191            }
192        } else if let Some(rx) = self.result_rx.take() {
193            match rx.await {
194                Ok(r) => r,
195                Err(_) => ExecResult::failure(1, "job channel closed"),
196            }
197        } else {
198            // Already waited
199            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
200        };
201
202        // Write output to temp file for later retrieval
203        if self.output_file.is_none()
204            && let Some(path) = self.write_output_file(&result) {
205                self.output_file = Some(path);
206            }
207
208        self.result = Some(result.clone());
209        result
210    }
211
212    /// Write job output to a temp file.
213    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
214        // Only write if there's output to capture
215        let text = result.text_out();
216        if text.is_empty() && result.err.is_empty() {
217            return None;
218        }
219
220        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
221        if std::fs::create_dir_all(&tmp_dir).is_err() {
222            tracing::warn!("Failed to create job output directory");
223            return None;
224        }
225
226        let filename = format!("job_{}.txt", self.id.0);
227        let path = tmp_dir.join(filename);
228
229        let mut content = String::new();
230        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
231        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
232
233        if !text.is_empty() {
234            content.push_str("## STDOUT\n");
235            content.push_str(&text);
236            if !text.ends_with('\n') {
237                content.push('\n');
238            }
239        }
240
241        if !result.err.is_empty() {
242            content.push_str("\n## STDERR\n");
243            content.push_str(&result.err);
244            if !result.err.ends_with('\n') {
245                content.push('\n');
246            }
247        }
248
249        match std::fs::write(&path, content) {
250            Ok(()) => Some(path),
251            Err(e) => {
252                tracing::warn!("Failed to write job output file: {}", e);
253                None
254            }
255        }
256    }
257
258    /// Remove any temp files associated with this job.
259    pub fn cleanup_files(&mut self) {
260        if let Some(path) = self.output_file.take() {
261            if let Err(e) = std::fs::remove_file(&path) {
262                // Ignore "not found" — file may not have been written
263                if e.kind() != io::ErrorKind::NotFound {
264                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
265                }
266            }
267        }
268    }
269
270    /// Get the result if completed, without waiting.
271    pub fn try_result(&self) -> Option<&ExecResult> {
272        self.result.as_ref()
273    }
274
275    /// Try to poll the result channel and update status.
276    ///
277    /// This is a non-blocking check that updates `self.result` if the
278    /// job has completed. Returns true if the job is now done.
279    pub fn try_poll(&mut self) -> bool {
280        if self.result.is_some() {
281            return true;
282        }
283
284        // Try to poll the oneshot channel
285        if let Some(rx) = self.result_rx.as_mut() {
286            match rx.try_recv() {
287                Ok(result) => {
288                    self.result = Some(result);
289                    self.result_rx = None;
290                    return true;
291                }
292                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
293                    // Still running
294                    return false;
295                }
296                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
297                    // Channel closed without result - job failed
298                    self.result = Some(ExecResult::failure(1, "job channel closed"));
299                    self.result_rx = None;
300                    return true;
301                }
302            }
303        }
304
305        // Check if handle is finished
306        if let Some(handle) = self.handle.as_mut()
307            && handle.is_finished() {
308                // Take the handle and wait for it (should be instant)
309                let Some(mut handle) = self.handle.take() else {
310                    return false;
311                };
312                // Poll directly with a noop waker — safe because is_finished() was true
313                let waker = std::task::Waker::noop();
314                let mut cx = std::task::Context::from_waker(waker);
315                let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
316                    std::task::Poll::Ready(Ok(r)) => r,
317                    std::task::Poll::Ready(Err(e)) => {
318                        ExecResult::failure(1, format!("job panicked: {}", e))
319                    }
320                    std::task::Poll::Pending => return false, // shouldn't happen
321                };
322                self.result = Some(result);
323                return true;
324            }
325
326        false
327    }
328}
329
330/// Manager for background jobs.
331pub struct JobManager {
332    /// Counter for generating unique job IDs.
333    next_id: AtomicU64,
334    /// Map of job ID to job.
335    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
336}
337
338impl JobManager {
339    /// Create a new job manager.
340    pub fn new() -> Self {
341        Self {
342            next_id: AtomicU64::new(1),
343            jobs: Arc::new(Mutex::new(HashMap::new())),
344        }
345    }
346
347    /// Spawn a new background job from a future.
348    ///
349    /// The job is inserted into the map synchronously before returning,
350    /// guaranteeing it's immediately queryable via `exists()` or `get()`.
351    pub fn spawn<F>(&self, command: String, future: F) -> JobId
352    where
353        F: std::future::Future<Output = ExecResult> + Send + 'static,
354    {
355        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
356        let handle = tokio::spawn(future);
357        let job = Job::new(id, command, handle);
358
359        // Spin on try_lock to guarantee the job is in the map on return.
360        // The lock is tokio::sync::Mutex which is only held briefly during
361        // HashMap operations, so contention resolves quickly.
362        let mut job_opt = Some(job);
363        loop {
364            match self.jobs.try_lock() {
365                Ok(mut guard) => {
366                    if let Some(j) = job_opt.take() {
367                        guard.insert(id, j);
368                    }
369                    break;
370                }
371                Err(_) => {
372                    std::hint::spin_loop();
373                }
374            }
375        }
376
377        id
378    }
379
380    /// Spawn a job that's already running and communicate via channel.
381    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
382        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
383        let job = Job::from_channel(id, command, rx);
384
385        let mut jobs = self.jobs.lock().await;
386        jobs.insert(id, job);
387
388        id
389    }
390
391    /// Register a job with attached output streams.
392    ///
393    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
394    pub async fn register_with_streams(
395        &self,
396        command: String,
397        rx: oneshot::Receiver<ExecResult>,
398        stdout: Arc<BoundedStream>,
399        stderr: Arc<BoundedStream>,
400    ) -> JobId {
401        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
402        let job = Job::with_streams(id, command, rx, stdout, stderr);
403
404        let mut jobs = self.jobs.lock().await;
405        jobs.insert(id, job);
406
407        id
408    }
409
410    /// Wait for a specific job to complete.
411    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
412        let mut jobs = self.jobs.lock().await;
413        if let Some(job) = jobs.get_mut(&id) {
414            Some(job.wait().await)
415        } else {
416            None
417        }
418    }
419
420    /// Wait for all jobs to complete, returning results in completion order.
421    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
422        let mut results = Vec::new();
423
424        // Get all job IDs
425        let ids: Vec<JobId> = {
426            let jobs = self.jobs.lock().await;
427            jobs.keys().copied().collect()
428        };
429
430        for id in ids {
431            if let Some(result) = self.wait(id).await {
432                results.push((id, result));
433            }
434        }
435
436        results
437    }
438
439    /// List all jobs with their status.
440    pub async fn list(&self) -> Vec<JobInfo> {
441        let mut jobs = self.jobs.lock().await;
442        jobs.values_mut()
443            .map(|job| JobInfo {
444                id: job.id,
445                command: job.command.clone(),
446                status: job.status(),
447                output_file: job.output_file.clone(),
448                pid: job.pid,
449            })
450            .collect()
451    }
452
453    /// Get the number of running jobs.
454    pub async fn running_count(&self) -> usize {
455        let mut jobs = self.jobs.lock().await;
456        let mut count = 0;
457        for job in jobs.values_mut() {
458            if !job.is_done() {
459                count += 1;
460            }
461        }
462        count
463    }
464
465    /// Remove completed jobs from tracking and clean up their temp files.
466    pub async fn cleanup(&self) {
467        let mut jobs = self.jobs.lock().await;
468        jobs.retain(|_, job| {
469            if job.is_done() {
470                job.cleanup_files();
471                false
472            } else {
473                true
474            }
475        });
476    }
477
478    /// Check if a specific job exists.
479    pub async fn exists(&self, id: JobId) -> bool {
480        let jobs = self.jobs.lock().await;
481        jobs.contains_key(&id)
482    }
483
484    /// Get info for a specific job.
485    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
486        let mut jobs = self.jobs.lock().await;
487        jobs.get_mut(&id).map(|job| JobInfo {
488            id: job.id,
489            command: job.command.clone(),
490            status: job.status(),
491            output_file: job.output_file.clone(),
492            pid: job.pid,
493        })
494    }
495
496    /// Get the command string for a job.
497    pub async fn get_command(&self, id: JobId) -> Option<String> {
498        let jobs = self.jobs.lock().await;
499        jobs.get(&id).map(|job| job.command.clone())
500    }
501
502    /// Get the status string for a job (for /v/jobs/{id}/status).
503    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
504        let mut jobs = self.jobs.lock().await;
505        jobs.get_mut(&id).map(|job| job.status_string())
506    }
507
508    /// Read stdout stream content for a job.
509    ///
510    /// Returns `None` if the job doesn't exist or has no attached stream.
511    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
512        let jobs = self.jobs.lock().await;
513        if let Some(job) = jobs.get(&id)
514            && let Some(stream) = job.stdout_stream() {
515                return Some(stream.read().await);
516            }
517        None
518    }
519
520    /// Read stderr stream content for a job.
521    ///
522    /// Returns `None` if the job doesn't exist or has no attached stream.
523    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
524        let jobs = self.jobs.lock().await;
525        if let Some(job) = jobs.get(&id)
526            && let Some(stream) = job.stderr_stream() {
527                return Some(stream.read().await);
528            }
529        None
530    }
531
532    /// List all job IDs.
533    pub async fn list_ids(&self) -> Vec<JobId> {
534        let jobs = self.jobs.lock().await;
535        jobs.keys().copied().collect()
536    }
537
538    /// Register a stopped job (from Ctrl-Z on a foreground process).
539    pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
540        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
541        let job = Job::stopped(id, command, pid, pgid);
542        let mut jobs = self.jobs.lock().await;
543        jobs.insert(id, job);
544        id
545    }
546
547    /// Mark a job as stopped with its process info.
548    pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
549        let mut jobs = self.jobs.lock().await;
550        if let Some(job) = jobs.get_mut(&id) {
551            job.stopped = true;
552            job.pid = Some(pid);
553            job.pgid = Some(pgid);
554        }
555    }
556
557    /// Mark a stopped job as resumed.
558    pub async fn resume_job(&self, id: JobId) {
559        let mut jobs = self.jobs.lock().await;
560        if let Some(job) = jobs.get_mut(&id) {
561            job.stopped = false;
562        }
563    }
564
565    /// Get the most recently stopped job.
566    pub async fn last_stopped(&self) -> Option<JobId> {
567        let mut jobs = self.jobs.lock().await;
568        // Find the highest-numbered stopped job
569        let mut best: Option<JobId> = None;
570        for job in jobs.values_mut() {
571            if job.stopped {
572                match best {
573                    None => best = Some(job.id),
574                    Some(b) if job.id.0 > b.0 => best = Some(job.id),
575                    _ => {}
576                }
577            }
578        }
579        best
580    }
581
582    /// Get process info (pid, pgid) for a job.
583    pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
584        let jobs = self.jobs.lock().await;
585        jobs.get(&id).and_then(|job| {
586            match (job.pid, job.pgid) {
587                (Some(pid), Some(pgid)) => Some((pid, pgid)),
588                _ => None,
589            }
590        })
591    }
592
593    /// Remove a job from tracking.
594    pub async fn remove(&self, id: JobId) {
595        let mut jobs = self.jobs.lock().await;
596        if let Some(mut job) = jobs.remove(&id) {
597            job.cleanup_files();
598        }
599    }
600}
601
602impl Default for JobManager {
603    fn default() -> Self {
604        Self::new()
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use std::time::Duration;
612
613    #[tokio::test]
614    async fn test_spawn_and_wait() {
615        let manager = JobManager::new();
616
617        let id = manager.spawn("test".to_string(), async {
618            tokio::time::sleep(Duration::from_millis(10)).await;
619            ExecResult::success("done")
620        });
621
622        // Wait a bit for the job to be registered
623        tokio::time::sleep(Duration::from_millis(5)).await;
624
625        let result = manager.wait(id).await;
626        assert!(result.is_some());
627        let result = result.unwrap();
628        assert!(result.ok());
629        assert_eq!(result.out, "done");
630    }
631
632    #[tokio::test]
633    async fn test_wait_all() {
634        let manager = JobManager::new();
635
636        manager.spawn("job1".to_string(), async {
637            tokio::time::sleep(Duration::from_millis(10)).await;
638            ExecResult::success("one")
639        });
640
641        manager.spawn("job2".to_string(), async {
642            tokio::time::sleep(Duration::from_millis(5)).await;
643            ExecResult::success("two")
644        });
645
646        // Wait for jobs to register
647        tokio::time::sleep(Duration::from_millis(5)).await;
648
649        let results = manager.wait_all().await;
650        assert_eq!(results.len(), 2);
651    }
652
653    #[tokio::test]
654    async fn test_list_jobs() {
655        let manager = JobManager::new();
656
657        manager.spawn("test job".to_string(), async {
658            tokio::time::sleep(Duration::from_millis(50)).await;
659            ExecResult::success("")
660        });
661
662        // Wait for job to register
663        tokio::time::sleep(Duration::from_millis(5)).await;
664
665        let jobs = manager.list().await;
666        assert_eq!(jobs.len(), 1);
667        assert_eq!(jobs[0].command, "test job");
668        assert_eq!(jobs[0].status, JobStatus::Running);
669    }
670
671    #[tokio::test]
672    async fn test_job_status_after_completion() {
673        let manager = JobManager::new();
674
675        let id = manager.spawn("quick".to_string(), async {
676            ExecResult::success("")
677        });
678
679        // Wait for job to complete
680        tokio::time::sleep(Duration::from_millis(10)).await;
681        let _ = manager.wait(id).await;
682
683        let info = manager.get(id).await;
684        assert!(info.is_some());
685        assert_eq!(info.unwrap().status, JobStatus::Done);
686    }
687
688    #[tokio::test]
689    async fn test_cleanup() {
690        let manager = JobManager::new();
691
692        let id = manager.spawn("done".to_string(), async {
693            ExecResult::success("")
694        });
695
696        // Wait for completion
697        tokio::time::sleep(Duration::from_millis(10)).await;
698        let _ = manager.wait(id).await;
699
700        // Should have 1 job
701        assert_eq!(manager.list().await.len(), 1);
702
703        // Cleanup
704        manager.cleanup().await;
705
706        // Should have 0 jobs
707        assert_eq!(manager.list().await.len(), 0);
708    }
709
710    #[tokio::test]
711    async fn test_cleanup_removes_temp_files() {
712        // Bug K: cleanup should remove temp files
713        let manager = JobManager::new();
714
715        let id = manager.spawn("output job".to_string(), async {
716            ExecResult::success("some output that gets written to a temp file")
717        });
718
719        // Wait for completion (triggers output file creation)
720        tokio::time::sleep(Duration::from_millis(10)).await;
721        let result = manager.wait(id).await;
722        assert!(result.is_some());
723
724        // Get the output file path before cleanup
725        let output_file = {
726            let jobs = manager.jobs.lock().await;
727            jobs.get(&id).and_then(|j| j.output_file().cloned())
728        };
729
730        // Cleanup should remove the job and its files
731        manager.cleanup().await;
732
733        // If an output file was created, it should be gone now
734        if let Some(path) = output_file {
735            assert!(
736                !path.exists(),
737                "temp file should be removed after cleanup: {}",
738                path.display()
739            );
740        }
741    }
742
743    #[tokio::test]
744    async fn test_register_with_channel() {
745        let manager = JobManager::new();
746        let (tx, rx) = oneshot::channel();
747
748        let id = manager.register("channel job".to_string(), rx).await;
749
750        // Send result
751        tx.send(ExecResult::success("from channel")).unwrap();
752
753        let result = manager.wait(id).await;
754        assert!(result.is_some());
755        assert_eq!(result.unwrap().out, "from channel");
756    }
757
758    #[tokio::test]
759    async fn test_spawn_immediately_available() {
760        // Bug J: job should be queryable immediately after spawn()
761        let manager = JobManager::new();
762
763        let id = manager.spawn("instant".to_string(), async {
764            tokio::time::sleep(Duration::from_millis(100)).await;
765            ExecResult::success("done")
766        });
767
768        // Should be immediately visible without any sleep
769        let exists = manager.exists(id).await;
770        assert!(exists, "job should be immediately available after spawn()");
771
772        let info = manager.get(id).await;
773        assert!(info.is_some(), "job info should be available immediately");
774    }
775
776    #[tokio::test]
777    async fn test_nonexistent_job() {
778        let manager = JobManager::new();
779        let result = manager.wait(JobId(999)).await;
780        assert!(result.is_none());
781    }
782}