1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21pub struct Job {
23 pub id: JobId,
25 session_id: u64,
29 pub command: String,
31 handle: Option<JoinHandle<ExecResult>>,
33 result_rx: Option<oneshot::Receiver<ExecResult>>,
35 result: Option<ExecResult>,
37 output_file: Option<PathBuf>,
39 persist_output: bool,
47 stdout_stream: Option<Arc<BoundedStream>>,
49 stderr_stream: Option<Arc<BoundedStream>>,
51 pid: Option<u32>,
53 pgid: Option<u32>,
55 stopped: bool,
57 cancel: Option<tokio_util::sync::CancellationToken>,
63 pgids: Vec<u32>,
68}
69
70impl Job {
71 pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
73 Self {
74 id,
75 session_id,
76 command,
77 handle: Some(handle),
78 result_rx: None,
79 result: None,
80 output_file: None,
81 persist_output: true,
82 stdout_stream: None,
83 stderr_stream: None,
84 pid: None,
85 pgid: None,
86 stopped: false,
87 cancel: None,
88 pgids: Vec::new(),
89 }
90 }
91
92 pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
94 Self {
95 id,
96 session_id,
97 command,
98 handle: None,
99 result_rx: Some(rx),
100 result: None,
101 output_file: None,
102 persist_output: true,
103 stdout_stream: None,
104 stderr_stream: None,
105 pid: None,
106 pgid: None,
107 stopped: false,
108 cancel: None,
109 pgids: Vec::new(),
110 }
111 }
112
113 pub fn with_streams(
117 id: JobId,
118 session_id: u64,
119 command: String,
120 rx: oneshot::Receiver<ExecResult>,
121 stdout: Arc<BoundedStream>,
122 stderr: Arc<BoundedStream>,
123 ) -> Self {
124 Self {
125 id,
126 session_id,
127 command,
128 handle: None,
129 result_rx: Some(rx),
130 result: None,
131 output_file: None,
132 persist_output: true,
133 stdout_stream: Some(stdout),
134 stderr_stream: Some(stderr),
135 pid: None,
136 pgid: None,
137 stopped: false,
138 cancel: None,
139 pgids: Vec::new(),
140 }
141 }
142
143 pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
145 Self {
146 id,
147 session_id,
148 command,
149 handle: None,
150 result_rx: None,
151 result: None,
152 output_file: None,
153 persist_output: true,
154 stdout_stream: None,
155 stderr_stream: None,
156 pid: Some(pid),
157 pgid: Some(pgid),
158 stopped: true,
159 cancel: None,
160 pgids: Vec::new(),
161 }
162 }
163
164 pub fn output_file(&self) -> Option<&PathBuf> {
166 self.output_file.as_ref()
167 }
168
169 pub fn is_done(&mut self) -> bool {
173 if self.stopped {
174 return false;
175 }
176 self.try_poll();
177 self.result.is_some()
178 }
179
180 pub fn status(&mut self) -> JobStatus {
182 if self.stopped {
183 return JobStatus::Stopped;
184 }
185 self.try_poll();
186 match &self.result {
187 Some(r) if r.ok() => JobStatus::Done,
188 Some(_) => JobStatus::Failed,
189 None => JobStatus::Running,
190 }
191 }
192
193 pub fn status_string(&mut self) -> String {
200 self.try_poll();
201 match &self.result {
202 Some(r) if r.ok() => "done:0".to_string(),
203 Some(r) => format!("failed:{}", r.code),
204 None => "running".to_string(),
205 }
206 }
207
208 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
210 self.stdout_stream.as_ref()
211 }
212
213 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
215 self.stderr_stream.as_ref()
216 }
217
218 pub async fn wait(&mut self) -> ExecResult {
222 if let Some(result) = self.result.take() {
223 self.result = Some(result.clone());
224 return result;
225 }
226
227 let result = if let Some(handle) = self.handle.take() {
228 match handle.await {
229 Ok(r) => r,
230 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
231 }
232 } else if let Some(rx) = self.result_rx.take() {
233 match rx.await {
234 Ok(r) => r,
235 Err(_) => ExecResult::failure(1, "job channel closed"),
236 }
237 } else {
238 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
240 };
241
242 if self.persist_output
246 && self.output_file.is_none()
247 && let Some(path) = self.write_output_file(&result) {
248 self.output_file = Some(path);
249 }
250
251 self.result = Some(result.clone());
252 result
253 }
254
255 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
257 let text = result.text_out();
259 if text.is_empty() && result.err.is_empty() {
260 return None;
261 }
262
263 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
264 if std::fs::create_dir_all(&tmp_dir).is_err() {
265 tracing::warn!("Failed to create job output directory");
266 return None;
267 }
268
269 let filename = format!(
278 "session_{}_job_{}.{}.txt",
279 self.session_id,
280 self.id.0,
281 std::process::id()
282 );
283 let path = tmp_dir.join(filename);
284
285 let mut content = String::new();
286 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
287 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
288
289 if !text.is_empty() {
290 content.push_str("## STDOUT\n");
291 content.push_str(&text);
292 if !text.ends_with('\n') {
293 content.push('\n');
294 }
295 }
296
297 if !result.err.is_empty() {
298 content.push_str("\n## STDERR\n");
299 content.push_str(&result.err);
300 if !result.err.ends_with('\n') {
301 content.push('\n');
302 }
303 }
304
305 match std::fs::write(&path, content) {
306 Ok(()) => Some(path),
307 Err(e) => {
308 tracing::warn!("Failed to write job output file: {}", e);
309 None
310 }
311 }
312 }
313
314 pub fn cleanup_files(&mut self) {
316 if let Some(path) = self.output_file.take() {
317 if let Err(e) = std::fs::remove_file(&path) {
318 if e.kind() != io::ErrorKind::NotFound {
320 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
321 }
322 }
323 }
324 }
325
326 pub fn try_result(&self) -> Option<&ExecResult> {
328 self.result.as_ref()
329 }
330
331 pub fn try_poll(&mut self) -> bool {
336 if self.result.is_some() {
337 return true;
338 }
339
340 if let Some(rx) = self.result_rx.as_mut() {
342 match rx.try_recv() {
343 Ok(result) => {
344 self.result = Some(result);
345 self.result_rx = None;
346 return true;
347 }
348 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
349 return false;
351 }
352 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
353 self.result = Some(ExecResult::failure(1, "job channel closed"));
355 self.result_rx = None;
356 return true;
357 }
358 }
359 }
360
361 if let Some(handle) = self.handle.as_mut()
363 && handle.is_finished() {
364 let Some(mut handle) = self.handle.take() else {
366 return false;
367 };
368 let waker = std::task::Waker::noop();
370 let mut cx = std::task::Context::from_waker(waker);
371 let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
372 std::task::Poll::Ready(Ok(r)) => r,
373 std::task::Poll::Ready(Err(e)) => {
374 ExecResult::failure(1, format!("job panicked: {}", e))
375 }
376 std::task::Poll::Pending => return false, };
378 self.result = Some(result);
379 return true;
380 }
381
382 false
383 }
384}
385
386static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
392
393pub struct JobManager {
395 session_id: u64,
397 next_id: AtomicU64,
399 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
401 persist_output_files: std::sync::atomic::AtomicBool,
407}
408
409impl JobManager {
410 pub fn new() -> Self {
412 Self {
413 session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
414 next_id: AtomicU64::new(1),
415 jobs: Arc::new(Mutex::new(HashMap::new())),
416 persist_output_files: std::sync::atomic::AtomicBool::new(true),
417 }
418 }
419
420 pub fn set_persist_output_files(&self, on: bool) {
430 self.persist_output_files.store(on, Ordering::Relaxed);
431 }
432
433 pub fn persist_output_files(&self) -> bool {
435 self.persist_output_files.load(Ordering::Relaxed)
436 }
437
438 pub fn spawn<F>(&self, command: String, future: F) -> JobId
443 where
444 F: std::future::Future<Output = ExecResult> + Send + 'static,
445 {
446 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
447 let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
450 let mut job = Job::new(id, self.session_id, command, handle);
451 job.persist_output = self.persist_output_files();
452
453 let mut job_opt = Some(job);
457 loop {
458 match self.jobs.try_lock() {
459 Ok(mut guard) => {
460 if let Some(j) = job_opt.take() {
461 guard.insert(id, j);
462 }
463 break;
464 }
465 Err(_) => {
466 std::hint::spin_loop();
467 }
468 }
469 }
470
471 id
472 }
473
474 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
476 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
477 let mut job = Job::from_channel(id, self.session_id, command, rx);
478 job.persist_output = self.persist_output_files();
479
480 let mut jobs = self.jobs.lock().await;
481 jobs.insert(id, job);
482
483 id
484 }
485
486 pub async fn register_with_streams(
490 &self,
491 command: String,
492 rx: oneshot::Receiver<ExecResult>,
493 stdout: Arc<BoundedStream>,
494 stderr: Arc<BoundedStream>,
495 ) -> JobId {
496 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
497 let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
498 job.persist_output = self.persist_output_files();
499
500 let mut jobs = self.jobs.lock().await;
501 jobs.insert(id, job);
502
503 id
504 }
505
506 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
508 let mut jobs = self.jobs.lock().await;
509 if let Some(job) = jobs.get_mut(&id) {
510 Some(job.wait().await)
511 } else {
512 None
513 }
514 }
515
516 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
518 let mut results = Vec::new();
519
520 let ids: Vec<JobId> = {
522 let jobs = self.jobs.lock().await;
523 jobs.keys().copied().collect()
524 };
525
526 for id in ids {
527 if let Some(result) = self.wait(id).await {
528 results.push((id, result));
529 }
530 }
531
532 results
533 }
534
535 pub async fn list(&self) -> Vec<JobInfo> {
537 let mut jobs = self.jobs.lock().await;
538 jobs.values_mut()
539 .map(|job| JobInfo {
540 id: job.id,
541 command: job.command.clone(),
542 status: job.status(),
543 output_file: job.output_file.clone(),
544 pid: job.pid,
545 })
546 .collect()
547 }
548
549 pub async fn running_count(&self) -> usize {
551 let mut jobs = self.jobs.lock().await;
552 let mut count = 0;
553 for job in jobs.values_mut() {
554 if !job.is_done() {
555 count += 1;
556 }
557 }
558 count
559 }
560
561 pub async fn cleanup(&self) {
563 let mut jobs = self.jobs.lock().await;
564 jobs.retain(|_, job| {
565 if job.is_done() {
566 job.cleanup_files();
567 false
568 } else {
569 true
570 }
571 });
572 }
573
574 pub async fn exists(&self, id: JobId) -> bool {
576 let jobs = self.jobs.lock().await;
577 jobs.contains_key(&id)
578 }
579
580 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
582 let mut jobs = self.jobs.lock().await;
583 jobs.get_mut(&id).map(|job| JobInfo {
584 id: job.id,
585 command: job.command.clone(),
586 status: job.status(),
587 output_file: job.output_file.clone(),
588 pid: job.pid,
589 })
590 }
591
592 pub async fn get_command(&self, id: JobId) -> Option<String> {
594 let jobs = self.jobs.lock().await;
595 jobs.get(&id).map(|job| job.command.clone())
596 }
597
598 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
600 let mut jobs = self.jobs.lock().await;
601 jobs.get_mut(&id).map(|job| job.status_string())
602 }
603
604 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
608 let jobs = self.jobs.lock().await;
609 if let Some(job) = jobs.get(&id)
610 && let Some(stream) = job.stdout_stream() {
611 return Some(stream.read().await);
612 }
613 None
614 }
615
616 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
620 let jobs = self.jobs.lock().await;
621 if let Some(job) = jobs.get(&id)
622 && let Some(stream) = job.stderr_stream() {
623 return Some(stream.read().await);
624 }
625 None
626 }
627
628 pub async fn list_ids(&self) -> Vec<JobId> {
630 let jobs = self.jobs.lock().await;
631 jobs.keys().copied().collect()
632 }
633
634 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
636 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
637 let job = Job::stopped(id, self.session_id, command, pid, pgid);
638 let mut jobs = self.jobs.lock().await;
639 jobs.insert(id, job);
640 id
641 }
642
643 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
645 let mut jobs = self.jobs.lock().await;
646 if let Some(job) = jobs.get_mut(&id) {
647 job.stopped = true;
648 job.pid = Some(pid);
649 job.pgid = Some(pgid);
650 }
651 }
652
653 pub async fn resume_job(&self, id: JobId) {
655 let mut jobs = self.jobs.lock().await;
656 if let Some(job) = jobs.get_mut(&id) {
657 job.stopped = false;
658 }
659 }
660
661 pub async fn last_stopped(&self) -> Option<JobId> {
663 let mut jobs = self.jobs.lock().await;
664 let mut best: Option<JobId> = None;
666 for job in jobs.values_mut() {
667 if job.stopped {
668 match best {
669 None => best = Some(job.id),
670 Some(b) if job.id.0 > b.0 => best = Some(job.id),
671 _ => {}
672 }
673 }
674 }
675 best
676 }
677
678 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
680 let jobs = self.jobs.lock().await;
681 jobs.get(&id).and_then(|job| {
682 match (job.pid, job.pgid) {
683 (Some(pid), Some(pgid)) => Some((pid, pgid)),
684 _ => None,
685 }
686 })
687 }
688
689 pub async fn set_cancel_token(&self, id: JobId, token: tokio_util::sync::CancellationToken) {
693 let mut jobs = self.jobs.lock().await;
694 if let Some(job) = jobs.get_mut(&id) {
695 job.cancel = Some(token);
696 }
697 }
698
699 pub async fn cancel(&self, id: JobId) -> bool {
703 let jobs = self.jobs.lock().await;
704 match jobs.get(&id).and_then(|job| job.cancel.clone()) {
705 Some(token) => {
706 token.cancel();
707 true
708 }
709 None => false,
710 }
711 }
712
713 pub async fn add_pgid(&self, id: JobId, pgid: u32) {
717 let mut jobs = self.jobs.lock().await;
718 if let Some(job) = jobs.get_mut(&id) {
719 if !job.pgids.contains(&pgid) {
720 job.pgids.push(pgid);
721 }
722 }
723 }
724
725 pub async fn job_pgids(&self, id: JobId) -> Vec<u32> {
729 let jobs = self.jobs.lock().await;
730 jobs.get(&id)
731 .map(|job| {
732 let mut v = job.pgids.clone();
733 if let Some(pg) = job.pgid {
734 if !v.contains(&pg) {
735 v.push(pg);
736 }
737 }
738 v
739 })
740 .unwrap_or_default()
741 }
742
743 pub async fn remove(&self, id: JobId) {
745 let mut jobs = self.jobs.lock().await;
746 if let Some(mut job) = jobs.remove(&id) {
747 job.cleanup_files();
748 }
749 }
750}
751
752impl Default for JobManager {
753 fn default() -> Self {
754 Self::new()
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use std::time::Duration;
762
763 #[tokio::test]
764 async fn test_no_host_output_file_when_persistence_disabled() {
765 let manager = JobManager::new();
769 assert!(manager.persist_output_files(), "default is to persist");
770 manager.set_persist_output_files(false);
771 assert!(!manager.persist_output_files());
772
773 let id = manager.spawn("leaky".to_string(), async {
774 ExecResult::success("output that must not hit host disk")
775 });
776 tokio::time::sleep(Duration::from_millis(10)).await;
777 let result = manager.wait(id).await;
778 assert!(result.is_some());
779
780 let output_file = {
782 let jobs = manager.jobs.lock().await;
783 jobs.get(&id).and_then(|j| j.output_file().cloned())
784 };
785 assert!(
786 output_file.is_none(),
787 "no host output file should be written when persistence is disabled, got {output_file:?}"
788 );
789 }
790
791 #[tokio::test]
792 async fn test_spawn_and_wait() {
793 let manager = JobManager::new();
794
795 let id = manager.spawn("test".to_string(), async {
796 tokio::time::sleep(Duration::from_millis(10)).await;
797 ExecResult::success("done")
798 });
799
800 tokio::time::sleep(Duration::from_millis(5)).await;
802
803 let result = manager.wait(id).await;
804 assert!(result.is_some());
805 let result = result.unwrap();
806 assert!(result.ok());
807 assert_eq!(&*result.text_out(), "done");
808 }
809
810 #[tokio::test]
811 async fn test_wait_all() {
812 let manager = JobManager::new();
813
814 manager.spawn("job1".to_string(), async {
815 tokio::time::sleep(Duration::from_millis(10)).await;
816 ExecResult::success("one")
817 });
818
819 manager.spawn("job2".to_string(), async {
820 tokio::time::sleep(Duration::from_millis(5)).await;
821 ExecResult::success("two")
822 });
823
824 tokio::time::sleep(Duration::from_millis(5)).await;
826
827 let results = manager.wait_all().await;
828 assert_eq!(results.len(), 2);
829 }
830
831 #[tokio::test]
832 async fn test_list_jobs() {
833 let manager = JobManager::new();
834
835 manager.spawn("test job".to_string(), async {
836 tokio::time::sleep(Duration::from_millis(50)).await;
837 ExecResult::success("")
838 });
839
840 tokio::time::sleep(Duration::from_millis(5)).await;
842
843 let jobs = manager.list().await;
844 assert_eq!(jobs.len(), 1);
845 assert_eq!(jobs[0].command, "test job");
846 assert_eq!(jobs[0].status, JobStatus::Running);
847 }
848
849 #[tokio::test]
850 async fn test_job_status_after_completion() {
851 let manager = JobManager::new();
852
853 let id = manager.spawn("quick".to_string(), async {
854 ExecResult::success("")
855 });
856
857 tokio::time::sleep(Duration::from_millis(10)).await;
859 let _ = manager.wait(id).await;
860
861 let info = manager.get(id).await;
862 assert!(info.is_some());
863 assert_eq!(info.unwrap().status, JobStatus::Done);
864 }
865
866 #[tokio::test]
867 async fn test_cleanup() {
868 let manager = JobManager::new();
869
870 let id = manager.spawn("done".to_string(), async {
871 ExecResult::success("")
872 });
873
874 tokio::time::sleep(Duration::from_millis(10)).await;
876 let _ = manager.wait(id).await;
877
878 assert_eq!(manager.list().await.len(), 1);
880
881 manager.cleanup().await;
883
884 assert_eq!(manager.list().await.len(), 0);
886 }
887
888 #[tokio::test]
889 async fn test_cleanup_removes_temp_files() {
890 let manager = JobManager::new();
892
893 let id = manager.spawn("output job".to_string(), async {
894 ExecResult::success("some output that gets written to a temp file")
895 });
896
897 tokio::time::sleep(Duration::from_millis(10)).await;
899 let result = manager.wait(id).await;
900 assert!(result.is_some());
901
902 let output_file = {
906 let jobs = manager.jobs.lock().await;
907 jobs.get(&id).and_then(|j| j.output_file().cloned())
908 };
909 let path = output_file.expect("job with output should have written a temp file");
910 assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
911
912 manager.cleanup().await;
914
915 assert!(
916 !path.exists(),
917 "temp file should be removed after cleanup: {}",
918 path.display()
919 );
920 }
921
922 #[tokio::test]
923 async fn test_register_with_channel() {
924 let manager = JobManager::new();
925 let (tx, rx) = oneshot::channel();
926
927 let id = manager.register("channel job".to_string(), rx).await;
928
929 tx.send(ExecResult::success("from channel")).unwrap();
931
932 let result = manager.wait(id).await;
933 assert!(result.is_some());
934 assert_eq!(&*result.unwrap().text_out(), "from channel");
935 }
936
937 #[tokio::test]
938 async fn test_spawn_immediately_available() {
939 let manager = JobManager::new();
941
942 let id = manager.spawn("instant".to_string(), async {
943 tokio::time::sleep(Duration::from_millis(100)).await;
944 ExecResult::success("done")
945 });
946
947 let exists = manager.exists(id).await;
949 assert!(exists, "job should be immediately available after spawn()");
950
951 let info = manager.get(id).await;
952 assert!(info.is_some(), "job info should be available immediately");
953 }
954
955 #[tokio::test]
956 async fn test_nonexistent_job() {
957 let manager = JobManager::new();
958 let result = manager.wait(JobId(999)).await;
959 assert!(result.is_none());
960 }
961
962 #[tokio::test]
963 async fn test_cancel_token_fires() {
964 let manager = JobManager::new();
967 let token = tokio_util::sync::CancellationToken::new();
968 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
969 manager.set_cancel_token(id, token.clone()).await;
970
971 assert!(!token.is_cancelled());
972 assert!(manager.cancel(id).await, "cancel should report success");
973 assert!(token.is_cancelled(), "the job's token must be tripped");
974 }
975
976 #[tokio::test]
977 async fn test_cancel_without_token_returns_false() {
978 let manager = JobManager::new();
979 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
980 assert!(!manager.cancel(id).await);
982 assert!(!manager.cancel(JobId(999)).await);
984 }
985
986 #[tokio::test]
987 async fn test_pgids_recorded_and_deduped() {
988 let manager = JobManager::new();
989 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
990 assert!(manager.job_pgids(id).await.is_empty());
991
992 manager.add_pgid(id, 4242).await;
993 manager.add_pgid(id, 4243).await;
994 manager.add_pgid(id, 4242).await; assert_eq!(manager.job_pgids(id).await, vec![4242, 4243]);
996
997 assert!(manager.job_pgids(JobId(999)).await.is_empty());
999 }
1000}