Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use tokio::sync::{oneshot, Mutex};
12use tokio::task::JoinHandle;
13
14use super::stream::BoundedStream;
15use crate::interpreter::ExecResult;
16
17/// Unique identifier for a background job.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub struct JobId(pub u64);
20
21impl std::fmt::Display for JobId {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "{}", self.0)
24    }
25}
26
27/// Status of a background job.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum JobStatus {
30    /// Job is currently running.
31    Running,
32    /// Job completed successfully.
33    Done,
34    /// Job failed with an error.
35    Failed,
36}
37
38impl std::fmt::Display for JobStatus {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            JobStatus::Running => write!(f, "Running"),
42            JobStatus::Done => write!(f, "Done"),
43            JobStatus::Failed => write!(f, "Failed"),
44        }
45    }
46}
47
48/// Information about a job for listing.
49#[derive(Debug, Clone)]
50pub struct JobInfo {
51    /// Job ID.
52    pub id: JobId,
53    /// Command description.
54    pub command: String,
55    /// Current status.
56    pub status: JobStatus,
57    /// Path to output file (if available).
58    pub output_file: Option<PathBuf>,
59}
60
61/// A background job.
62pub struct Job {
63    /// Job ID.
64    pub id: JobId,
65    /// Command description.
66    pub command: String,
67    /// Task handle (None if already awaited).
68    handle: Option<JoinHandle<ExecResult>>,
69    /// Channel to receive result (alternative to handle).
70    result_rx: Option<oneshot::Receiver<ExecResult>>,
71    /// Cached result after completion.
72    result: Option<ExecResult>,
73    /// Path to output file (captures stdout/stderr after completion).
74    output_file: Option<PathBuf>,
75    /// Live stdout stream (bounded ring buffer).
76    stdout_stream: Option<Arc<BoundedStream>>,
77    /// Live stderr stream (bounded ring buffer).
78    stderr_stream: Option<Arc<BoundedStream>>,
79}
80
81impl Job {
82    /// Create a new job from a task handle.
83    pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
84        Self {
85            id,
86            command,
87            handle: Some(handle),
88            result_rx: None,
89            result: None,
90            output_file: None,
91            stdout_stream: None,
92            stderr_stream: None,
93        }
94    }
95
96    /// Create a new job from a result channel.
97    pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
98        Self {
99            id,
100            command,
101            handle: None,
102            result_rx: Some(rx),
103            result: None,
104            output_file: None,
105            stdout_stream: None,
106            stderr_stream: None,
107        }
108    }
109
110    /// Create a new job with attached output streams.
111    ///
112    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
113    pub fn with_streams(
114        id: JobId,
115        command: String,
116        rx: oneshot::Receiver<ExecResult>,
117        stdout: Arc<BoundedStream>,
118        stderr: Arc<BoundedStream>,
119    ) -> Self {
120        Self {
121            id,
122            command,
123            handle: None,
124            result_rx: Some(rx),
125            result: None,
126            output_file: None,
127            stdout_stream: Some(stdout),
128            stderr_stream: Some(stderr),
129        }
130    }
131
132    /// Get the output file path (if available).
133    pub fn output_file(&self) -> Option<&PathBuf> {
134        self.output_file.as_ref()
135    }
136
137    /// Check if the job has completed.
138    pub fn is_done(&mut self) -> bool {
139        self.try_poll();
140        self.result.is_some()
141    }
142
143    /// Get the job's status.
144    pub fn status(&mut self) -> JobStatus {
145        self.try_poll();
146        match &self.result {
147            Some(r) if r.ok() => JobStatus::Done,
148            Some(_) => JobStatus::Failed,
149            None => JobStatus::Running,
150        }
151    }
152
153    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
154    ///
155    /// Returns:
156    /// - `"running"` if the job is still running
157    /// - `"done:0"` if the job completed successfully
158    /// - `"failed:{code}"` if the job failed with an exit code
159    pub fn status_string(&mut self) -> String {
160        self.try_poll();
161        match &self.result {
162            Some(r) if r.ok() => "done:0".to_string(),
163            Some(r) => format!("failed:{}", r.code),
164            None => "running".to_string(),
165        }
166    }
167
168    /// Get the stdout stream (if attached).
169    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
170        self.stdout_stream.as_ref()
171    }
172
173    /// Get the stderr stream (if attached).
174    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
175        self.stderr_stream.as_ref()
176    }
177
178    /// Wait for the job to complete and return its result.
179    ///
180    /// On completion, the job's output is written to a temp file for later retrieval.
181    pub async fn wait(&mut self) -> ExecResult {
182        if let Some(result) = self.result.take() {
183            self.result = Some(result.clone());
184            return result;
185        }
186
187        let result = if let Some(handle) = self.handle.take() {
188            match handle.await {
189                Ok(r) => r,
190                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
191            }
192        } else if let Some(rx) = self.result_rx.take() {
193            match rx.await {
194                Ok(r) => r,
195                Err(_) => ExecResult::failure(1, "job channel closed"),
196            }
197        } else {
198            // Already waited
199            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
200        };
201
202        // Write output to temp file for later retrieval
203        if self.output_file.is_none()
204            && let Some(path) = self.write_output_file(&result) {
205                self.output_file = Some(path);
206            }
207
208        self.result = Some(result.clone());
209        result
210    }
211
212    /// Write job output to a temp file.
213    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
214        // Only write if there's output to capture
215        if result.out.is_empty() && result.err.is_empty() {
216            return None;
217        }
218
219        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
220        if std::fs::create_dir_all(&tmp_dir).is_err() {
221            tracing::warn!("Failed to create job output directory");
222            return None;
223        }
224
225        let filename = format!("job_{}.txt", self.id.0);
226        let path = tmp_dir.join(filename);
227
228        let mut content = String::new();
229        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
230        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
231
232        if !result.out.is_empty() {
233            content.push_str("## STDOUT\n");
234            content.push_str(&result.out);
235            if !result.out.ends_with('\n') {
236                content.push('\n');
237            }
238        }
239
240        if !result.err.is_empty() {
241            content.push_str("\n## STDERR\n");
242            content.push_str(&result.err);
243            if !result.err.ends_with('\n') {
244                content.push('\n');
245            }
246        }
247
248        match std::fs::write(&path, content) {
249            Ok(()) => Some(path),
250            Err(e) => {
251                tracing::warn!("Failed to write job output file: {}", e);
252                None
253            }
254        }
255    }
256
257    /// Remove any temp files associated with this job.
258    pub fn cleanup_files(&mut self) {
259        if let Some(path) = self.output_file.take() {
260            if let Err(e) = std::fs::remove_file(&path) {
261                // Ignore "not found" — file may not have been written
262                if e.kind() != io::ErrorKind::NotFound {
263                    tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
264                }
265            }
266        }
267    }
268
269    /// Get the result if completed, without waiting.
270    pub fn try_result(&self) -> Option<&ExecResult> {
271        self.result.as_ref()
272    }
273
274    /// Try to poll the result channel and update status.
275    ///
276    /// This is a non-blocking check that updates `self.result` if the
277    /// job has completed. Returns true if the job is now done.
278    pub fn try_poll(&mut self) -> bool {
279        if self.result.is_some() {
280            return true;
281        }
282
283        // Try to poll the oneshot channel
284        if let Some(rx) = self.result_rx.as_mut() {
285            match rx.try_recv() {
286                Ok(result) => {
287                    self.result = Some(result);
288                    self.result_rx = None;
289                    return true;
290                }
291                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
292                    // Still running
293                    return false;
294                }
295                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
296                    // Channel closed without result - job failed
297                    self.result = Some(ExecResult::failure(1, "job channel closed"));
298                    self.result_rx = None;
299                    return true;
300                }
301            }
302        }
303
304        // Check if handle is finished
305        if let Some(handle) = self.handle.as_mut()
306            && handle.is_finished() {
307                // Take the handle and wait for it (should be instant)
308                let Some(handle) = self.handle.take() else {
309                    return false;
310                };
311                // We can't await here, so we use now_or_never
312                // Note: this is synchronous since is_finished() was true
313                let result = match tokio::task::block_in_place(|| {
314                    tokio::runtime::Handle::current().block_on(handle)
315                }) {
316                    Ok(r) => r,
317                    Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
318                };
319                self.result = Some(result);
320                return true;
321            }
322
323        false
324    }
325}
326
327/// Manager for background jobs.
328pub struct JobManager {
329    /// Counter for generating unique job IDs.
330    next_id: AtomicU64,
331    /// Map of job ID to job.
332    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
333}
334
335impl JobManager {
336    /// Create a new job manager.
337    pub fn new() -> Self {
338        Self {
339            next_id: AtomicU64::new(1),
340            jobs: Arc::new(Mutex::new(HashMap::new())),
341        }
342    }
343
344    /// Spawn a new background job from a future.
345    pub fn spawn<F>(&self, command: String, future: F) -> JobId
346    where
347        F: std::future::Future<Output = ExecResult> + Send + 'static,
348    {
349        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
350        let handle = tokio::spawn(future);
351        let job = Job::new(id, command, handle);
352
353        // Try synchronous insert first (succeeds if lock is uncontended)
354        let jobs = self.jobs.clone();
355        if let Ok(mut guard) = jobs.try_lock() {
356            guard.insert(id, job);
357        } else {
358            tokio::spawn(async move {
359                let mut jobs = jobs.lock().await;
360                jobs.insert(id, job);
361            });
362        }
363
364        id
365    }
366
367    /// Spawn a job that's already running and communicate via channel.
368    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
369        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
370        let job = Job::from_channel(id, command, rx);
371
372        let mut jobs = self.jobs.lock().await;
373        jobs.insert(id, job);
374
375        id
376    }
377
378    /// Register a job with attached output streams.
379    ///
380    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
381    pub async fn register_with_streams(
382        &self,
383        command: String,
384        rx: oneshot::Receiver<ExecResult>,
385        stdout: Arc<BoundedStream>,
386        stderr: Arc<BoundedStream>,
387    ) -> JobId {
388        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
389        let job = Job::with_streams(id, command, rx, stdout, stderr);
390
391        let mut jobs = self.jobs.lock().await;
392        jobs.insert(id, job);
393
394        id
395    }
396
397    /// Wait for a specific job to complete.
398    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
399        let mut jobs = self.jobs.lock().await;
400        if let Some(job) = jobs.get_mut(&id) {
401            Some(job.wait().await)
402        } else {
403            None
404        }
405    }
406
407    /// Wait for all jobs to complete, returning results in completion order.
408    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
409        let mut results = Vec::new();
410
411        // Get all job IDs
412        let ids: Vec<JobId> = {
413            let jobs = self.jobs.lock().await;
414            jobs.keys().copied().collect()
415        };
416
417        for id in ids {
418            if let Some(result) = self.wait(id).await {
419                results.push((id, result));
420            }
421        }
422
423        results
424    }
425
426    /// List all jobs with their status.
427    pub async fn list(&self) -> Vec<JobInfo> {
428        let mut jobs = self.jobs.lock().await;
429        jobs.values_mut()
430            .map(|job| JobInfo {
431                id: job.id,
432                command: job.command.clone(),
433                status: job.status(),
434                output_file: job.output_file.clone(),
435            })
436            .collect()
437    }
438
439    /// Get the number of running jobs.
440    pub async fn running_count(&self) -> usize {
441        let mut jobs = self.jobs.lock().await;
442        let mut count = 0;
443        for job in jobs.values_mut() {
444            if !job.is_done() {
445                count += 1;
446            }
447        }
448        count
449    }
450
451    /// Remove completed jobs from tracking and clean up their temp files.
452    pub async fn cleanup(&self) {
453        let mut jobs = self.jobs.lock().await;
454        jobs.retain(|_, job| {
455            if job.is_done() {
456                job.cleanup_files();
457                false
458            } else {
459                true
460            }
461        });
462    }
463
464    /// Check if a specific job exists.
465    pub async fn exists(&self, id: JobId) -> bool {
466        let jobs = self.jobs.lock().await;
467        jobs.contains_key(&id)
468    }
469
470    /// Get info for a specific job.
471    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
472        let mut jobs = self.jobs.lock().await;
473        jobs.get_mut(&id).map(|job| JobInfo {
474            id: job.id,
475            command: job.command.clone(),
476            status: job.status(),
477            output_file: job.output_file.clone(),
478        })
479    }
480
481    /// Get the command string for a job.
482    pub async fn get_command(&self, id: JobId) -> Option<String> {
483        let jobs = self.jobs.lock().await;
484        jobs.get(&id).map(|job| job.command.clone())
485    }
486
487    /// Get the status string for a job (for /v/jobs/{id}/status).
488    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
489        let mut jobs = self.jobs.lock().await;
490        jobs.get_mut(&id).map(|job| job.status_string())
491    }
492
493    /// Read stdout stream content for a job.
494    ///
495    /// Returns `None` if the job doesn't exist or has no attached stream.
496    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
497        let jobs = self.jobs.lock().await;
498        if let Some(job) = jobs.get(&id)
499            && let Some(stream) = job.stdout_stream() {
500                return Some(stream.read().await);
501            }
502        None
503    }
504
505    /// Read stderr stream content for a job.
506    ///
507    /// Returns `None` if the job doesn't exist or has no attached stream.
508    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
509        let jobs = self.jobs.lock().await;
510        if let Some(job) = jobs.get(&id)
511            && let Some(stream) = job.stderr_stream() {
512                return Some(stream.read().await);
513            }
514        None
515    }
516
517    /// List all job IDs.
518    pub async fn list_ids(&self) -> Vec<JobId> {
519        let jobs = self.jobs.lock().await;
520        jobs.keys().copied().collect()
521    }
522}
523
524impl Default for JobManager {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use std::time::Duration;
534
535    #[tokio::test]
536    async fn test_spawn_and_wait() {
537        let manager = JobManager::new();
538
539        let id = manager.spawn("test".to_string(), async {
540            tokio::time::sleep(Duration::from_millis(10)).await;
541            ExecResult::success("done")
542        });
543
544        // Wait a bit for the job to be registered
545        tokio::time::sleep(Duration::from_millis(5)).await;
546
547        let result = manager.wait(id).await;
548        assert!(result.is_some());
549        let result = result.unwrap();
550        assert!(result.ok());
551        assert_eq!(result.out, "done");
552    }
553
554    #[tokio::test]
555    async fn test_wait_all() {
556        let manager = JobManager::new();
557
558        manager.spawn("job1".to_string(), async {
559            tokio::time::sleep(Duration::from_millis(10)).await;
560            ExecResult::success("one")
561        });
562
563        manager.spawn("job2".to_string(), async {
564            tokio::time::sleep(Duration::from_millis(5)).await;
565            ExecResult::success("two")
566        });
567
568        // Wait for jobs to register
569        tokio::time::sleep(Duration::from_millis(5)).await;
570
571        let results = manager.wait_all().await;
572        assert_eq!(results.len(), 2);
573    }
574
575    #[tokio::test]
576    async fn test_list_jobs() {
577        let manager = JobManager::new();
578
579        manager.spawn("test job".to_string(), async {
580            tokio::time::sleep(Duration::from_millis(50)).await;
581            ExecResult::success("")
582        });
583
584        // Wait for job to register
585        tokio::time::sleep(Duration::from_millis(5)).await;
586
587        let jobs = manager.list().await;
588        assert_eq!(jobs.len(), 1);
589        assert_eq!(jobs[0].command, "test job");
590        assert_eq!(jobs[0].status, JobStatus::Running);
591    }
592
593    #[tokio::test]
594    async fn test_job_status_after_completion() {
595        let manager = JobManager::new();
596
597        let id = manager.spawn("quick".to_string(), async {
598            ExecResult::success("")
599        });
600
601        // Wait for job to complete
602        tokio::time::sleep(Duration::from_millis(10)).await;
603        let _ = manager.wait(id).await;
604
605        let info = manager.get(id).await;
606        assert!(info.is_some());
607        assert_eq!(info.unwrap().status, JobStatus::Done);
608    }
609
610    #[tokio::test]
611    async fn test_cleanup() {
612        let manager = JobManager::new();
613
614        let id = manager.spawn("done".to_string(), async {
615            ExecResult::success("")
616        });
617
618        // Wait for completion
619        tokio::time::sleep(Duration::from_millis(10)).await;
620        let _ = manager.wait(id).await;
621
622        // Should have 1 job
623        assert_eq!(manager.list().await.len(), 1);
624
625        // Cleanup
626        manager.cleanup().await;
627
628        // Should have 0 jobs
629        assert_eq!(manager.list().await.len(), 0);
630    }
631
632    #[tokio::test]
633    async fn test_cleanup_removes_temp_files() {
634        // Bug K: cleanup should remove temp files
635        let manager = JobManager::new();
636
637        let id = manager.spawn("output job".to_string(), async {
638            ExecResult::success("some output that gets written to a temp file")
639        });
640
641        // Wait for completion (triggers output file creation)
642        tokio::time::sleep(Duration::from_millis(10)).await;
643        let result = manager.wait(id).await;
644        assert!(result.is_some());
645
646        // Get the output file path before cleanup
647        let output_file = {
648            let jobs = manager.jobs.lock().await;
649            jobs.get(&id).and_then(|j| j.output_file().cloned())
650        };
651
652        // Cleanup should remove the job and its files
653        manager.cleanup().await;
654
655        // If an output file was created, it should be gone now
656        if let Some(path) = output_file {
657            assert!(
658                !path.exists(),
659                "temp file should be removed after cleanup: {}",
660                path.display()
661            );
662        }
663    }
664
665    #[tokio::test]
666    async fn test_register_with_channel() {
667        let manager = JobManager::new();
668        let (tx, rx) = oneshot::channel();
669
670        let id = manager.register("channel job".to_string(), rx).await;
671
672        // Send result
673        tx.send(ExecResult::success("from channel")).unwrap();
674
675        let result = manager.wait(id).await;
676        assert!(result.is_some());
677        assert_eq!(result.unwrap().out, "from channel");
678    }
679
680    #[tokio::test]
681    async fn test_spawn_immediately_available() {
682        // Bug J: job should be queryable immediately after spawn()
683        let manager = JobManager::new();
684
685        let id = manager.spawn("instant".to_string(), async {
686            tokio::time::sleep(Duration::from_millis(100)).await;
687            ExecResult::success("done")
688        });
689
690        // Should be immediately visible without any sleep
691        let exists = manager.exists(id).await;
692        assert!(exists, "job should be immediately available after spawn()");
693
694        let info = manager.get(id).await;
695        assert!(info.is_some(), "job info should be available immediately");
696    }
697
698    #[tokio::test]
699    async fn test_nonexistent_job() {
700        let manager = JobManager::new();
701        let result = manager.wait(JobId(999)).await;
702        assert!(result.is_none());
703    }
704}