1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21pub struct Job {
23 pub id: JobId,
25 session_id: u64,
29 pub command: String,
31 handle: Option<JoinHandle<ExecResult>>,
33 result_rx: Option<oneshot::Receiver<ExecResult>>,
35 result: Option<ExecResult>,
37 output_file: Option<PathBuf>,
39 persist_output: bool,
47 stdout_stream: Option<Arc<BoundedStream>>,
49 stderr_stream: Option<Arc<BoundedStream>>,
51 pid: Option<u32>,
53 pgid: Option<u32>,
55 stopped: bool,
57}
58
59impl Job {
60 pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
62 Self {
63 id,
64 session_id,
65 command,
66 handle: Some(handle),
67 result_rx: None,
68 result: None,
69 output_file: None,
70 persist_output: true,
71 stdout_stream: None,
72 stderr_stream: None,
73 pid: None,
74 pgid: None,
75 stopped: false,
76 }
77 }
78
79 pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
81 Self {
82 id,
83 session_id,
84 command,
85 handle: None,
86 result_rx: Some(rx),
87 result: None,
88 output_file: None,
89 persist_output: true,
90 stdout_stream: None,
91 stderr_stream: None,
92 pid: None,
93 pgid: None,
94 stopped: false,
95 }
96 }
97
98 pub fn with_streams(
102 id: JobId,
103 session_id: u64,
104 command: String,
105 rx: oneshot::Receiver<ExecResult>,
106 stdout: Arc<BoundedStream>,
107 stderr: Arc<BoundedStream>,
108 ) -> Self {
109 Self {
110 id,
111 session_id,
112 command,
113 handle: None,
114 result_rx: Some(rx),
115 result: None,
116 output_file: None,
117 persist_output: true,
118 stdout_stream: Some(stdout),
119 stderr_stream: Some(stderr),
120 pid: None,
121 pgid: None,
122 stopped: false,
123 }
124 }
125
126 pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
128 Self {
129 id,
130 session_id,
131 command,
132 handle: None,
133 result_rx: None,
134 result: None,
135 output_file: None,
136 persist_output: true,
137 stdout_stream: None,
138 stderr_stream: None,
139 pid: Some(pid),
140 pgid: Some(pgid),
141 stopped: true,
142 }
143 }
144
145 pub fn output_file(&self) -> Option<&PathBuf> {
147 self.output_file.as_ref()
148 }
149
150 pub fn is_done(&mut self) -> bool {
154 if self.stopped {
155 return false;
156 }
157 self.try_poll();
158 self.result.is_some()
159 }
160
161 pub fn status(&mut self) -> JobStatus {
163 if self.stopped {
164 return JobStatus::Stopped;
165 }
166 self.try_poll();
167 match &self.result {
168 Some(r) if r.ok() => JobStatus::Done,
169 Some(_) => JobStatus::Failed,
170 None => JobStatus::Running,
171 }
172 }
173
174 pub fn status_string(&mut self) -> String {
181 self.try_poll();
182 match &self.result {
183 Some(r) if r.ok() => "done:0".to_string(),
184 Some(r) => format!("failed:{}", r.code),
185 None => "running".to_string(),
186 }
187 }
188
189 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
191 self.stdout_stream.as_ref()
192 }
193
194 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
196 self.stderr_stream.as_ref()
197 }
198
199 pub async fn wait(&mut self) -> ExecResult {
203 if let Some(result) = self.result.take() {
204 self.result = Some(result.clone());
205 return result;
206 }
207
208 let result = if let Some(handle) = self.handle.take() {
209 match handle.await {
210 Ok(r) => r,
211 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
212 }
213 } else if let Some(rx) = self.result_rx.take() {
214 match rx.await {
215 Ok(r) => r,
216 Err(_) => ExecResult::failure(1, "job channel closed"),
217 }
218 } else {
219 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
221 };
222
223 if self.persist_output
227 && self.output_file.is_none()
228 && let Some(path) = self.write_output_file(&result) {
229 self.output_file = Some(path);
230 }
231
232 self.result = Some(result.clone());
233 result
234 }
235
236 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
238 let text = result.text_out();
240 if text.is_empty() && result.err.is_empty() {
241 return None;
242 }
243
244 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
245 if std::fs::create_dir_all(&tmp_dir).is_err() {
246 tracing::warn!("Failed to create job output directory");
247 return None;
248 }
249
250 let filename = format!(
259 "session_{}_job_{}.{}.txt",
260 self.session_id,
261 self.id.0,
262 std::process::id()
263 );
264 let path = tmp_dir.join(filename);
265
266 let mut content = String::new();
267 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
268 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
269
270 if !text.is_empty() {
271 content.push_str("## STDOUT\n");
272 content.push_str(&text);
273 if !text.ends_with('\n') {
274 content.push('\n');
275 }
276 }
277
278 if !result.err.is_empty() {
279 content.push_str("\n## STDERR\n");
280 content.push_str(&result.err);
281 if !result.err.ends_with('\n') {
282 content.push('\n');
283 }
284 }
285
286 match std::fs::write(&path, content) {
287 Ok(()) => Some(path),
288 Err(e) => {
289 tracing::warn!("Failed to write job output file: {}", e);
290 None
291 }
292 }
293 }
294
295 pub fn cleanup_files(&mut self) {
297 if let Some(path) = self.output_file.take() {
298 if let Err(e) = std::fs::remove_file(&path) {
299 if e.kind() != io::ErrorKind::NotFound {
301 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
302 }
303 }
304 }
305 }
306
307 pub fn try_result(&self) -> Option<&ExecResult> {
309 self.result.as_ref()
310 }
311
312 pub fn try_poll(&mut self) -> bool {
317 if self.result.is_some() {
318 return true;
319 }
320
321 if let Some(rx) = self.result_rx.as_mut() {
323 match rx.try_recv() {
324 Ok(result) => {
325 self.result = Some(result);
326 self.result_rx = None;
327 return true;
328 }
329 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
330 return false;
332 }
333 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
334 self.result = Some(ExecResult::failure(1, "job channel closed"));
336 self.result_rx = None;
337 return true;
338 }
339 }
340 }
341
342 if let Some(handle) = self.handle.as_mut()
344 && handle.is_finished() {
345 let Some(mut handle) = self.handle.take() else {
347 return false;
348 };
349 let waker = std::task::Waker::noop();
351 let mut cx = std::task::Context::from_waker(waker);
352 let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
353 std::task::Poll::Ready(Ok(r)) => r,
354 std::task::Poll::Ready(Err(e)) => {
355 ExecResult::failure(1, format!("job panicked: {}", e))
356 }
357 std::task::Poll::Pending => return false, };
359 self.result = Some(result);
360 return true;
361 }
362
363 false
364 }
365}
366
367static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
373
374pub struct JobManager {
376 session_id: u64,
378 next_id: AtomicU64,
380 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
382 persist_output_files: std::sync::atomic::AtomicBool,
388}
389
390impl JobManager {
391 pub fn new() -> Self {
393 Self {
394 session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
395 next_id: AtomicU64::new(1),
396 jobs: Arc::new(Mutex::new(HashMap::new())),
397 persist_output_files: std::sync::atomic::AtomicBool::new(true),
398 }
399 }
400
401 pub fn set_persist_output_files(&self, on: bool) {
411 self.persist_output_files.store(on, Ordering::Relaxed);
412 }
413
414 pub fn persist_output_files(&self) -> bool {
416 self.persist_output_files.load(Ordering::Relaxed)
417 }
418
419 pub fn spawn<F>(&self, command: String, future: F) -> JobId
424 where
425 F: std::future::Future<Output = ExecResult> + Send + 'static,
426 {
427 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
428 let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
431 let mut job = Job::new(id, self.session_id, command, handle);
432 job.persist_output = self.persist_output_files();
433
434 let mut job_opt = Some(job);
438 loop {
439 match self.jobs.try_lock() {
440 Ok(mut guard) => {
441 if let Some(j) = job_opt.take() {
442 guard.insert(id, j);
443 }
444 break;
445 }
446 Err(_) => {
447 std::hint::spin_loop();
448 }
449 }
450 }
451
452 id
453 }
454
455 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
457 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
458 let mut job = Job::from_channel(id, self.session_id, command, rx);
459 job.persist_output = self.persist_output_files();
460
461 let mut jobs = self.jobs.lock().await;
462 jobs.insert(id, job);
463
464 id
465 }
466
467 pub async fn register_with_streams(
471 &self,
472 command: String,
473 rx: oneshot::Receiver<ExecResult>,
474 stdout: Arc<BoundedStream>,
475 stderr: Arc<BoundedStream>,
476 ) -> JobId {
477 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
478 let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
479 job.persist_output = self.persist_output_files();
480
481 let mut jobs = self.jobs.lock().await;
482 jobs.insert(id, job);
483
484 id
485 }
486
487 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
489 let mut jobs = self.jobs.lock().await;
490 if let Some(job) = jobs.get_mut(&id) {
491 Some(job.wait().await)
492 } else {
493 None
494 }
495 }
496
497 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
499 let mut results = Vec::new();
500
501 let ids: Vec<JobId> = {
503 let jobs = self.jobs.lock().await;
504 jobs.keys().copied().collect()
505 };
506
507 for id in ids {
508 if let Some(result) = self.wait(id).await {
509 results.push((id, result));
510 }
511 }
512
513 results
514 }
515
516 pub async fn list(&self) -> Vec<JobInfo> {
518 let mut jobs = self.jobs.lock().await;
519 jobs.values_mut()
520 .map(|job| JobInfo {
521 id: job.id,
522 command: job.command.clone(),
523 status: job.status(),
524 output_file: job.output_file.clone(),
525 pid: job.pid,
526 })
527 .collect()
528 }
529
530 pub async fn running_count(&self) -> usize {
532 let mut jobs = self.jobs.lock().await;
533 let mut count = 0;
534 for job in jobs.values_mut() {
535 if !job.is_done() {
536 count += 1;
537 }
538 }
539 count
540 }
541
542 pub async fn cleanup(&self) {
544 let mut jobs = self.jobs.lock().await;
545 jobs.retain(|_, job| {
546 if job.is_done() {
547 job.cleanup_files();
548 false
549 } else {
550 true
551 }
552 });
553 }
554
555 pub async fn exists(&self, id: JobId) -> bool {
557 let jobs = self.jobs.lock().await;
558 jobs.contains_key(&id)
559 }
560
561 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
563 let mut jobs = self.jobs.lock().await;
564 jobs.get_mut(&id).map(|job| JobInfo {
565 id: job.id,
566 command: job.command.clone(),
567 status: job.status(),
568 output_file: job.output_file.clone(),
569 pid: job.pid,
570 })
571 }
572
573 pub async fn get_command(&self, id: JobId) -> Option<String> {
575 let jobs = self.jobs.lock().await;
576 jobs.get(&id).map(|job| job.command.clone())
577 }
578
579 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
581 let mut jobs = self.jobs.lock().await;
582 jobs.get_mut(&id).map(|job| job.status_string())
583 }
584
585 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
589 let jobs = self.jobs.lock().await;
590 if let Some(job) = jobs.get(&id)
591 && let Some(stream) = job.stdout_stream() {
592 return Some(stream.read().await);
593 }
594 None
595 }
596
597 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
601 let jobs = self.jobs.lock().await;
602 if let Some(job) = jobs.get(&id)
603 && let Some(stream) = job.stderr_stream() {
604 return Some(stream.read().await);
605 }
606 None
607 }
608
609 pub async fn list_ids(&self) -> Vec<JobId> {
611 let jobs = self.jobs.lock().await;
612 jobs.keys().copied().collect()
613 }
614
615 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
617 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
618 let job = Job::stopped(id, self.session_id, command, pid, pgid);
619 let mut jobs = self.jobs.lock().await;
620 jobs.insert(id, job);
621 id
622 }
623
624 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
626 let mut jobs = self.jobs.lock().await;
627 if let Some(job) = jobs.get_mut(&id) {
628 job.stopped = true;
629 job.pid = Some(pid);
630 job.pgid = Some(pgid);
631 }
632 }
633
634 pub async fn resume_job(&self, id: JobId) {
636 let mut jobs = self.jobs.lock().await;
637 if let Some(job) = jobs.get_mut(&id) {
638 job.stopped = false;
639 }
640 }
641
642 pub async fn last_stopped(&self) -> Option<JobId> {
644 let mut jobs = self.jobs.lock().await;
645 let mut best: Option<JobId> = None;
647 for job in jobs.values_mut() {
648 if job.stopped {
649 match best {
650 None => best = Some(job.id),
651 Some(b) if job.id.0 > b.0 => best = Some(job.id),
652 _ => {}
653 }
654 }
655 }
656 best
657 }
658
659 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
661 let jobs = self.jobs.lock().await;
662 jobs.get(&id).and_then(|job| {
663 match (job.pid, job.pgid) {
664 (Some(pid), Some(pgid)) => Some((pid, pgid)),
665 _ => None,
666 }
667 })
668 }
669
670 pub async fn remove(&self, id: JobId) {
672 let mut jobs = self.jobs.lock().await;
673 if let Some(mut job) = jobs.remove(&id) {
674 job.cleanup_files();
675 }
676 }
677}
678
679impl Default for JobManager {
680 fn default() -> Self {
681 Self::new()
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use std::time::Duration;
689
690 #[tokio::test]
691 async fn test_no_host_output_file_when_persistence_disabled() {
692 let manager = JobManager::new();
696 assert!(manager.persist_output_files(), "default is to persist");
697 manager.set_persist_output_files(false);
698 assert!(!manager.persist_output_files());
699
700 let id = manager.spawn("leaky".to_string(), async {
701 ExecResult::success("output that must not hit host disk")
702 });
703 tokio::time::sleep(Duration::from_millis(10)).await;
704 let result = manager.wait(id).await;
705 assert!(result.is_some());
706
707 let output_file = {
709 let jobs = manager.jobs.lock().await;
710 jobs.get(&id).and_then(|j| j.output_file().cloned())
711 };
712 assert!(
713 output_file.is_none(),
714 "no host output file should be written when persistence is disabled, got {output_file:?}"
715 );
716 }
717
718 #[tokio::test]
719 async fn test_spawn_and_wait() {
720 let manager = JobManager::new();
721
722 let id = manager.spawn("test".to_string(), async {
723 tokio::time::sleep(Duration::from_millis(10)).await;
724 ExecResult::success("done")
725 });
726
727 tokio::time::sleep(Duration::from_millis(5)).await;
729
730 let result = manager.wait(id).await;
731 assert!(result.is_some());
732 let result = result.unwrap();
733 assert!(result.ok());
734 assert_eq!(&*result.text_out(), "done");
735 }
736
737 #[tokio::test]
738 async fn test_wait_all() {
739 let manager = JobManager::new();
740
741 manager.spawn("job1".to_string(), async {
742 tokio::time::sleep(Duration::from_millis(10)).await;
743 ExecResult::success("one")
744 });
745
746 manager.spawn("job2".to_string(), async {
747 tokio::time::sleep(Duration::from_millis(5)).await;
748 ExecResult::success("two")
749 });
750
751 tokio::time::sleep(Duration::from_millis(5)).await;
753
754 let results = manager.wait_all().await;
755 assert_eq!(results.len(), 2);
756 }
757
758 #[tokio::test]
759 async fn test_list_jobs() {
760 let manager = JobManager::new();
761
762 manager.spawn("test job".to_string(), async {
763 tokio::time::sleep(Duration::from_millis(50)).await;
764 ExecResult::success("")
765 });
766
767 tokio::time::sleep(Duration::from_millis(5)).await;
769
770 let jobs = manager.list().await;
771 assert_eq!(jobs.len(), 1);
772 assert_eq!(jobs[0].command, "test job");
773 assert_eq!(jobs[0].status, JobStatus::Running);
774 }
775
776 #[tokio::test]
777 async fn test_job_status_after_completion() {
778 let manager = JobManager::new();
779
780 let id = manager.spawn("quick".to_string(), async {
781 ExecResult::success("")
782 });
783
784 tokio::time::sleep(Duration::from_millis(10)).await;
786 let _ = manager.wait(id).await;
787
788 let info = manager.get(id).await;
789 assert!(info.is_some());
790 assert_eq!(info.unwrap().status, JobStatus::Done);
791 }
792
793 #[tokio::test]
794 async fn test_cleanup() {
795 let manager = JobManager::new();
796
797 let id = manager.spawn("done".to_string(), async {
798 ExecResult::success("")
799 });
800
801 tokio::time::sleep(Duration::from_millis(10)).await;
803 let _ = manager.wait(id).await;
804
805 assert_eq!(manager.list().await.len(), 1);
807
808 manager.cleanup().await;
810
811 assert_eq!(manager.list().await.len(), 0);
813 }
814
815 #[tokio::test]
816 async fn test_cleanup_removes_temp_files() {
817 let manager = JobManager::new();
819
820 let id = manager.spawn("output job".to_string(), async {
821 ExecResult::success("some output that gets written to a temp file")
822 });
823
824 tokio::time::sleep(Duration::from_millis(10)).await;
826 let result = manager.wait(id).await;
827 assert!(result.is_some());
828
829 let output_file = {
833 let jobs = manager.jobs.lock().await;
834 jobs.get(&id).and_then(|j| j.output_file().cloned())
835 };
836 let path = output_file.expect("job with output should have written a temp file");
837 assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
838
839 manager.cleanup().await;
841
842 assert!(
843 !path.exists(),
844 "temp file should be removed after cleanup: {}",
845 path.display()
846 );
847 }
848
849 #[tokio::test]
850 async fn test_register_with_channel() {
851 let manager = JobManager::new();
852 let (tx, rx) = oneshot::channel();
853
854 let id = manager.register("channel job".to_string(), rx).await;
855
856 tx.send(ExecResult::success("from channel")).unwrap();
858
859 let result = manager.wait(id).await;
860 assert!(result.is_some());
861 assert_eq!(&*result.unwrap().text_out(), "from channel");
862 }
863
864 #[tokio::test]
865 async fn test_spawn_immediately_available() {
866 let manager = JobManager::new();
868
869 let id = manager.spawn("instant".to_string(), async {
870 tokio::time::sleep(Duration::from_millis(100)).await;
871 ExecResult::success("done")
872 });
873
874 let exists = manager.exists(id).await;
876 assert!(exists, "job should be immediately available after spawn()");
877
878 let info = manager.get(id).await;
879 assert!(info.is_some(), "job info should be available immediately");
880 }
881
882 #[tokio::test]
883 async fn test_nonexistent_job() {
884 let manager = JobManager::new();
885 let result = manager.wait(JobId(999)).await;
886 assert!(result.is_none());
887 }
888}