Skip to main content

kaish_kernel/scheduler/
job.rs

1//! Background job management for kaish.
2//!
3//! Provides the `JobManager` for tracking background jobs started with `&`.
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use tokio::sync::{oneshot, Mutex};
11use tokio::task::JoinHandle;
12
13use super::stream::BoundedStream;
14use crate::interpreter::ExecResult;
15
16/// Unique identifier for a background job.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct JobId(pub u64);
19
20impl std::fmt::Display for JobId {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        write!(f, "{}", self.0)
23    }
24}
25
26/// Status of a background job.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum JobStatus {
29    /// Job is currently running.
30    Running,
31    /// Job completed successfully.
32    Done,
33    /// Job failed with an error.
34    Failed,
35}
36
37impl std::fmt::Display for JobStatus {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            JobStatus::Running => write!(f, "Running"),
41            JobStatus::Done => write!(f, "Done"),
42            JobStatus::Failed => write!(f, "Failed"),
43        }
44    }
45}
46
47/// Information about a job for listing.
48#[derive(Debug, Clone)]
49pub struct JobInfo {
50    /// Job ID.
51    pub id: JobId,
52    /// Command description.
53    pub command: String,
54    /// Current status.
55    pub status: JobStatus,
56    /// Path to output file (if available).
57    pub output_file: Option<PathBuf>,
58}
59
60/// A background job.
61pub struct Job {
62    /// Job ID.
63    pub id: JobId,
64    /// Command description.
65    pub command: String,
66    /// Task handle (None if already awaited).
67    handle: Option<JoinHandle<ExecResult>>,
68    /// Channel to receive result (alternative to handle).
69    result_rx: Option<oneshot::Receiver<ExecResult>>,
70    /// Cached result after completion.
71    result: Option<ExecResult>,
72    /// Path to output file (captures stdout/stderr after completion).
73    output_file: Option<PathBuf>,
74    /// Live stdout stream (bounded ring buffer).
75    stdout_stream: Option<Arc<BoundedStream>>,
76    /// Live stderr stream (bounded ring buffer).
77    stderr_stream: Option<Arc<BoundedStream>>,
78}
79
80impl Job {
81    /// Create a new job from a task handle.
82    pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
83        Self {
84            id,
85            command,
86            handle: Some(handle),
87            result_rx: None,
88            result: None,
89            output_file: None,
90            stdout_stream: None,
91            stderr_stream: None,
92        }
93    }
94
95    /// Create a new job from a result channel.
96    pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
97        Self {
98            id,
99            command,
100            handle: None,
101            result_rx: Some(rx),
102            result: None,
103            output_file: None,
104            stdout_stream: None,
105            stderr_stream: None,
106        }
107    }
108
109    /// Create a new job with attached output streams.
110    ///
111    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
112    pub fn with_streams(
113        id: JobId,
114        command: String,
115        rx: oneshot::Receiver<ExecResult>,
116        stdout: Arc<BoundedStream>,
117        stderr: Arc<BoundedStream>,
118    ) -> Self {
119        Self {
120            id,
121            command,
122            handle: None,
123            result_rx: Some(rx),
124            result: None,
125            output_file: None,
126            stdout_stream: Some(stdout),
127            stderr_stream: Some(stderr),
128        }
129    }
130
131    /// Get the output file path (if available).
132    pub fn output_file(&self) -> Option<&PathBuf> {
133        self.output_file.as_ref()
134    }
135
136    /// Check if the job has completed.
137    pub fn is_done(&mut self) -> bool {
138        self.try_poll();
139        self.result.is_some()
140    }
141
142    /// Get the job's status.
143    pub fn status(&mut self) -> JobStatus {
144        self.try_poll();
145        match &self.result {
146            Some(r) if r.ok() => JobStatus::Done,
147            Some(_) => JobStatus::Failed,
148            None => JobStatus::Running,
149        }
150    }
151
152    /// Get the job's status as a string suitable for /v/jobs/{id}/status.
153    ///
154    /// Returns:
155    /// - `"running"` if the job is still running
156    /// - `"done:0"` if the job completed successfully
157    /// - `"failed:{code}"` if the job failed with an exit code
158    pub fn status_string(&mut self) -> String {
159        self.try_poll();
160        match &self.result {
161            Some(r) if r.ok() => "done:0".to_string(),
162            Some(r) => format!("failed:{}", r.code),
163            None => "running".to_string(),
164        }
165    }
166
167    /// Get the stdout stream (if attached).
168    pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
169        self.stdout_stream.as_ref()
170    }
171
172    /// Get the stderr stream (if attached).
173    pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
174        self.stderr_stream.as_ref()
175    }
176
177    /// Wait for the job to complete and return its result.
178    ///
179    /// On completion, the job's output is written to a temp file for later retrieval.
180    pub async fn wait(&mut self) -> ExecResult {
181        if let Some(result) = self.result.take() {
182            self.result = Some(result.clone());
183            return result;
184        }
185
186        let result = if let Some(handle) = self.handle.take() {
187            match handle.await {
188                Ok(r) => r,
189                Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
190            }
191        } else if let Some(rx) = self.result_rx.take() {
192            match rx.await {
193                Ok(r) => r,
194                Err(_) => ExecResult::failure(1, "job channel closed"),
195            }
196        } else {
197            // Already waited
198            self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
199        };
200
201        // Write output to temp file for later retrieval
202        if self.output_file.is_none()
203            && let Some(path) = self.write_output_file(&result) {
204                self.output_file = Some(path);
205            }
206
207        self.result = Some(result.clone());
208        result
209    }
210
211    /// Write job output to a temp file.
212    fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
213        // Only write if there's output to capture
214        if result.out.is_empty() && result.err.is_empty() {
215            return None;
216        }
217
218        let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
219        if std::fs::create_dir_all(&tmp_dir).is_err() {
220            tracing::warn!("Failed to create job output directory");
221            return None;
222        }
223
224        let filename = format!("job_{}.txt", self.id.0);
225        let path = tmp_dir.join(filename);
226
227        let mut content = String::new();
228        content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
229        content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
230
231        if !result.out.is_empty() {
232            content.push_str("## STDOUT\n");
233            content.push_str(&result.out);
234            if !result.out.ends_with('\n') {
235                content.push('\n');
236            }
237        }
238
239        if !result.err.is_empty() {
240            content.push_str("\n## STDERR\n");
241            content.push_str(&result.err);
242            if !result.err.ends_with('\n') {
243                content.push('\n');
244            }
245        }
246
247        match std::fs::write(&path, content) {
248            Ok(()) => Some(path),
249            Err(e) => {
250                tracing::warn!("Failed to write job output file: {}", e);
251                None
252            }
253        }
254    }
255
256    /// Get the result if completed, without waiting.
257    pub fn try_result(&self) -> Option<&ExecResult> {
258        self.result.as_ref()
259    }
260
261    /// Try to poll the result channel and update status.
262    ///
263    /// This is a non-blocking check that updates `self.result` if the
264    /// job has completed. Returns true if the job is now done.
265    pub fn try_poll(&mut self) -> bool {
266        if self.result.is_some() {
267            return true;
268        }
269
270        // Try to poll the oneshot channel
271        if let Some(rx) = self.result_rx.as_mut() {
272            match rx.try_recv() {
273                Ok(result) => {
274                    self.result = Some(result);
275                    self.result_rx = None;
276                    return true;
277                }
278                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
279                    // Still running
280                    return false;
281                }
282                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
283                    // Channel closed without result - job failed
284                    self.result = Some(ExecResult::failure(1, "job channel closed"));
285                    self.result_rx = None;
286                    return true;
287                }
288            }
289        }
290
291        // Check if handle is finished
292        if let Some(handle) = self.handle.as_mut()
293            && handle.is_finished() {
294                // Take the handle and wait for it (should be instant)
295                let Some(handle) = self.handle.take() else {
296                    return false;
297                };
298                // We can't await here, so we use now_or_never
299                // Note: this is synchronous since is_finished() was true
300                let result = match tokio::task::block_in_place(|| {
301                    tokio::runtime::Handle::current().block_on(handle)
302                }) {
303                    Ok(r) => r,
304                    Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
305                };
306                self.result = Some(result);
307                return true;
308            }
309
310        false
311    }
312}
313
314/// Manager for background jobs.
315pub struct JobManager {
316    /// Counter for generating unique job IDs.
317    next_id: AtomicU64,
318    /// Map of job ID to job.
319    jobs: Arc<Mutex<HashMap<JobId, Job>>>,
320}
321
322impl JobManager {
323    /// Create a new job manager.
324    pub fn new() -> Self {
325        Self {
326            next_id: AtomicU64::new(1),
327            jobs: Arc::new(Mutex::new(HashMap::new())),
328        }
329    }
330
331    /// Spawn a new background job from a future.
332    pub fn spawn<F>(&self, command: String, future: F) -> JobId
333    where
334        F: std::future::Future<Output = ExecResult> + Send + 'static,
335    {
336        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
337        let handle = tokio::spawn(future);
338        let job = Job::new(id, command, handle);
339
340        // Can't await here, so use try_lock or spawn a task to insert
341        let jobs = self.jobs.clone();
342        tokio::spawn(async move {
343            let mut jobs = jobs.lock().await;
344            jobs.insert(id, job);
345        });
346
347        id
348    }
349
350    /// Spawn a job that's already running and communicate via channel.
351    pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
352        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
353        let job = Job::from_channel(id, command, rx);
354
355        let mut jobs = self.jobs.lock().await;
356        jobs.insert(id, job);
357
358        id
359    }
360
361    /// Register a job with attached output streams.
362    ///
363    /// The streams provide live access to job output via `/v/jobs/{id}/stdout` and `/stderr`.
364    pub async fn register_with_streams(
365        &self,
366        command: String,
367        rx: oneshot::Receiver<ExecResult>,
368        stdout: Arc<BoundedStream>,
369        stderr: Arc<BoundedStream>,
370    ) -> JobId {
371        let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
372        let job = Job::with_streams(id, command, rx, stdout, stderr);
373
374        let mut jobs = self.jobs.lock().await;
375        jobs.insert(id, job);
376
377        id
378    }
379
380    /// Wait for a specific job to complete.
381    pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
382        let mut jobs = self.jobs.lock().await;
383        if let Some(job) = jobs.get_mut(&id) {
384            Some(job.wait().await)
385        } else {
386            None
387        }
388    }
389
390    /// Wait for all jobs to complete, returning results in completion order.
391    pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
392        let mut results = Vec::new();
393
394        // Get all job IDs
395        let ids: Vec<JobId> = {
396            let jobs = self.jobs.lock().await;
397            jobs.keys().copied().collect()
398        };
399
400        for id in ids {
401            if let Some(result) = self.wait(id).await {
402                results.push((id, result));
403            }
404        }
405
406        results
407    }
408
409    /// List all jobs with their status.
410    pub async fn list(&self) -> Vec<JobInfo> {
411        let mut jobs = self.jobs.lock().await;
412        jobs.values_mut()
413            .map(|job| JobInfo {
414                id: job.id,
415                command: job.command.clone(),
416                status: job.status(),
417                output_file: job.output_file.clone(),
418            })
419            .collect()
420    }
421
422    /// Get the number of running jobs.
423    pub async fn running_count(&self) -> usize {
424        let mut jobs = self.jobs.lock().await;
425        let mut count = 0;
426        for job in jobs.values_mut() {
427            if !job.is_done() {
428                count += 1;
429            }
430        }
431        count
432    }
433
434    /// Remove completed jobs from tracking.
435    pub async fn cleanup(&self) {
436        let mut jobs = self.jobs.lock().await;
437        jobs.retain(|_, job| !job.is_done());
438    }
439
440    /// Check if a specific job exists.
441    pub async fn exists(&self, id: JobId) -> bool {
442        let jobs = self.jobs.lock().await;
443        jobs.contains_key(&id)
444    }
445
446    /// Get info for a specific job.
447    pub async fn get(&self, id: JobId) -> Option<JobInfo> {
448        let mut jobs = self.jobs.lock().await;
449        jobs.get_mut(&id).map(|job| JobInfo {
450            id: job.id,
451            command: job.command.clone(),
452            status: job.status(),
453            output_file: job.output_file.clone(),
454        })
455    }
456
457    /// Get the command string for a job.
458    pub async fn get_command(&self, id: JobId) -> Option<String> {
459        let jobs = self.jobs.lock().await;
460        jobs.get(&id).map(|job| job.command.clone())
461    }
462
463    /// Get the status string for a job (for /v/jobs/{id}/status).
464    pub async fn get_status_string(&self, id: JobId) -> Option<String> {
465        let mut jobs = self.jobs.lock().await;
466        jobs.get_mut(&id).map(|job| job.status_string())
467    }
468
469    /// Read stdout stream content for a job.
470    ///
471    /// Returns `None` if the job doesn't exist or has no attached stream.
472    pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
473        let jobs = self.jobs.lock().await;
474        if let Some(job) = jobs.get(&id)
475            && let Some(stream) = job.stdout_stream() {
476                return Some(stream.read().await);
477            }
478        None
479    }
480
481    /// Read stderr stream content for a job.
482    ///
483    /// Returns `None` if the job doesn't exist or has no attached stream.
484    pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
485        let jobs = self.jobs.lock().await;
486        if let Some(job) = jobs.get(&id)
487            && let Some(stream) = job.stderr_stream() {
488                return Some(stream.read().await);
489            }
490        None
491    }
492
493    /// List all job IDs.
494    pub async fn list_ids(&self) -> Vec<JobId> {
495        let jobs = self.jobs.lock().await;
496        jobs.keys().copied().collect()
497    }
498}
499
500impl Default for JobManager {
501    fn default() -> Self {
502        Self::new()
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509    use std::time::Duration;
510
511    #[tokio::test]
512    async fn test_spawn_and_wait() {
513        let manager = JobManager::new();
514
515        let id = manager.spawn("test".to_string(), async {
516            tokio::time::sleep(Duration::from_millis(10)).await;
517            ExecResult::success("done")
518        });
519
520        // Wait a bit for the job to be registered
521        tokio::time::sleep(Duration::from_millis(5)).await;
522
523        let result = manager.wait(id).await;
524        assert!(result.is_some());
525        let result = result.unwrap();
526        assert!(result.ok());
527        assert_eq!(result.out, "done");
528    }
529
530    #[tokio::test]
531    async fn test_wait_all() {
532        let manager = JobManager::new();
533
534        manager.spawn("job1".to_string(), async {
535            tokio::time::sleep(Duration::from_millis(10)).await;
536            ExecResult::success("one")
537        });
538
539        manager.spawn("job2".to_string(), async {
540            tokio::time::sleep(Duration::from_millis(5)).await;
541            ExecResult::success("two")
542        });
543
544        // Wait for jobs to register
545        tokio::time::sleep(Duration::from_millis(5)).await;
546
547        let results = manager.wait_all().await;
548        assert_eq!(results.len(), 2);
549    }
550
551    #[tokio::test]
552    async fn test_list_jobs() {
553        let manager = JobManager::new();
554
555        manager.spawn("test job".to_string(), async {
556            tokio::time::sleep(Duration::from_millis(50)).await;
557            ExecResult::success("")
558        });
559
560        // Wait for job to register
561        tokio::time::sleep(Duration::from_millis(5)).await;
562
563        let jobs = manager.list().await;
564        assert_eq!(jobs.len(), 1);
565        assert_eq!(jobs[0].command, "test job");
566        assert_eq!(jobs[0].status, JobStatus::Running);
567    }
568
569    #[tokio::test]
570    async fn test_job_status_after_completion() {
571        let manager = JobManager::new();
572
573        let id = manager.spawn("quick".to_string(), async {
574            ExecResult::success("")
575        });
576
577        // Wait for job to complete
578        tokio::time::sleep(Duration::from_millis(10)).await;
579        let _ = manager.wait(id).await;
580
581        let info = manager.get(id).await;
582        assert!(info.is_some());
583        assert_eq!(info.unwrap().status, JobStatus::Done);
584    }
585
586    #[tokio::test]
587    async fn test_cleanup() {
588        let manager = JobManager::new();
589
590        let id = manager.spawn("done".to_string(), async {
591            ExecResult::success("")
592        });
593
594        // Wait for completion
595        tokio::time::sleep(Duration::from_millis(10)).await;
596        let _ = manager.wait(id).await;
597
598        // Should have 1 job
599        assert_eq!(manager.list().await.len(), 1);
600
601        // Cleanup
602        manager.cleanup().await;
603
604        // Should have 0 jobs
605        assert_eq!(manager.list().await.len(), 0);
606    }
607
608    #[tokio::test]
609    async fn test_register_with_channel() {
610        let manager = JobManager::new();
611        let (tx, rx) = oneshot::channel();
612
613        let id = manager.register("channel job".to_string(), rx).await;
614
615        // Send result
616        tx.send(ExecResult::success("from channel")).unwrap();
617
618        let result = manager.wait(id).await;
619        assert!(result.is_some());
620        assert_eq!(result.unwrap().out, "from channel");
621    }
622
623    #[tokio::test]
624    async fn test_nonexistent_job() {
625        let manager = JobManager::new();
626        let result = manager.wait(JobId(999)).await;
627        assert!(result.is_none());
628    }
629}