1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use tokio::sync::{oneshot, Mutex};
12use tokio::task::JoinHandle;
13
14use super::stream::BoundedStream;
15use crate::interpreter::ExecResult;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub struct JobId(pub u64);
20
21impl std::fmt::Display for JobId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "{}", self.0)
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum JobStatus {
30 Running,
32 Stopped,
34 Done,
36 Failed,
38}
39
40impl std::fmt::Display for JobStatus {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 JobStatus::Running => write!(f, "Running"),
44 JobStatus::Stopped => write!(f, "Stopped"),
45 JobStatus::Done => write!(f, "Done"),
46 JobStatus::Failed => write!(f, "Failed"),
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct JobInfo {
54 pub id: JobId,
56 pub command: String,
58 pub status: JobStatus,
60 pub output_file: Option<PathBuf>,
62 pub pid: Option<u32>,
64}
65
66pub struct Job {
68 pub id: JobId,
70 pub command: String,
72 handle: Option<JoinHandle<ExecResult>>,
74 result_rx: Option<oneshot::Receiver<ExecResult>>,
76 result: Option<ExecResult>,
78 output_file: Option<PathBuf>,
80 stdout_stream: Option<Arc<BoundedStream>>,
82 stderr_stream: Option<Arc<BoundedStream>>,
84 pid: Option<u32>,
86 pgid: Option<u32>,
88 stopped: bool,
90}
91
92impl Job {
93 pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
95 Self {
96 id,
97 command,
98 handle: Some(handle),
99 result_rx: None,
100 result: None,
101 output_file: None,
102 stdout_stream: None,
103 stderr_stream: None,
104 pid: None,
105 pgid: None,
106 stopped: false,
107 }
108 }
109
110 pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
112 Self {
113 id,
114 command,
115 handle: None,
116 result_rx: Some(rx),
117 result: None,
118 output_file: None,
119 stdout_stream: None,
120 stderr_stream: None,
121 pid: None,
122 pgid: None,
123 stopped: false,
124 }
125 }
126
127 pub fn with_streams(
131 id: JobId,
132 command: String,
133 rx: oneshot::Receiver<ExecResult>,
134 stdout: Arc<BoundedStream>,
135 stderr: Arc<BoundedStream>,
136 ) -> Self {
137 Self {
138 id,
139 command,
140 handle: None,
141 result_rx: Some(rx),
142 result: None,
143 output_file: None,
144 stdout_stream: Some(stdout),
145 stderr_stream: Some(stderr),
146 pid: None,
147 pgid: None,
148 stopped: false,
149 }
150 }
151
152 pub fn stopped(id: JobId, command: String, pid: u32, pgid: u32) -> Self {
154 Self {
155 id,
156 command,
157 handle: None,
158 result_rx: None,
159 result: None,
160 output_file: None,
161 stdout_stream: None,
162 stderr_stream: None,
163 pid: Some(pid),
164 pgid: Some(pgid),
165 stopped: true,
166 }
167 }
168
169 pub fn output_file(&self) -> Option<&PathBuf> {
171 self.output_file.as_ref()
172 }
173
174 pub fn is_done(&mut self) -> bool {
178 if self.stopped {
179 return false;
180 }
181 self.try_poll();
182 self.result.is_some()
183 }
184
185 pub fn status(&mut self) -> JobStatus {
187 if self.stopped {
188 return JobStatus::Stopped;
189 }
190 self.try_poll();
191 match &self.result {
192 Some(r) if r.ok() => JobStatus::Done,
193 Some(_) => JobStatus::Failed,
194 None => JobStatus::Running,
195 }
196 }
197
198 pub fn status_string(&mut self) -> String {
205 self.try_poll();
206 match &self.result {
207 Some(r) if r.ok() => "done:0".to_string(),
208 Some(r) => format!("failed:{}", r.code),
209 None => "running".to_string(),
210 }
211 }
212
213 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
215 self.stdout_stream.as_ref()
216 }
217
218 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
220 self.stderr_stream.as_ref()
221 }
222
223 pub async fn wait(&mut self) -> ExecResult {
227 if let Some(result) = self.result.take() {
228 self.result = Some(result.clone());
229 return result;
230 }
231
232 let result = if let Some(handle) = self.handle.take() {
233 match handle.await {
234 Ok(r) => r,
235 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
236 }
237 } else if let Some(rx) = self.result_rx.take() {
238 match rx.await {
239 Ok(r) => r,
240 Err(_) => ExecResult::failure(1, "job channel closed"),
241 }
242 } else {
243 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
245 };
246
247 if self.output_file.is_none()
249 && let Some(path) = self.write_output_file(&result) {
250 self.output_file = Some(path);
251 }
252
253 self.result = Some(result.clone());
254 result
255 }
256
257 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
259 if result.out.is_empty() && result.err.is_empty() {
261 return None;
262 }
263
264 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
265 if std::fs::create_dir_all(&tmp_dir).is_err() {
266 tracing::warn!("Failed to create job output directory");
267 return None;
268 }
269
270 let filename = format!("job_{}.txt", self.id.0);
271 let path = tmp_dir.join(filename);
272
273 let mut content = String::new();
274 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
275 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
276
277 if !result.out.is_empty() {
278 content.push_str("## STDOUT\n");
279 content.push_str(&result.out);
280 if !result.out.ends_with('\n') {
281 content.push('\n');
282 }
283 }
284
285 if !result.err.is_empty() {
286 content.push_str("\n## STDERR\n");
287 content.push_str(&result.err);
288 if !result.err.ends_with('\n') {
289 content.push('\n');
290 }
291 }
292
293 match std::fs::write(&path, content) {
294 Ok(()) => Some(path),
295 Err(e) => {
296 tracing::warn!("Failed to write job output file: {}", e);
297 None
298 }
299 }
300 }
301
302 pub fn cleanup_files(&mut self) {
304 if let Some(path) = self.output_file.take() {
305 if let Err(e) = std::fs::remove_file(&path) {
306 if e.kind() != io::ErrorKind::NotFound {
308 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
309 }
310 }
311 }
312 }
313
314 pub fn try_result(&self) -> Option<&ExecResult> {
316 self.result.as_ref()
317 }
318
319 pub fn try_poll(&mut self) -> bool {
324 if self.result.is_some() {
325 return true;
326 }
327
328 if let Some(rx) = self.result_rx.as_mut() {
330 match rx.try_recv() {
331 Ok(result) => {
332 self.result = Some(result);
333 self.result_rx = None;
334 return true;
335 }
336 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
337 return false;
339 }
340 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
341 self.result = Some(ExecResult::failure(1, "job channel closed"));
343 self.result_rx = None;
344 return true;
345 }
346 }
347 }
348
349 if let Some(handle) = self.handle.as_mut()
351 && handle.is_finished() {
352 let Some(handle) = self.handle.take() else {
354 return false;
355 };
356 let result = match tokio::task::block_in_place(|| {
359 tokio::runtime::Handle::current().block_on(handle)
360 }) {
361 Ok(r) => r,
362 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
363 };
364 self.result = Some(result);
365 return true;
366 }
367
368 false
369 }
370}
371
372pub struct JobManager {
374 next_id: AtomicU64,
376 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
378}
379
380impl JobManager {
381 pub fn new() -> Self {
383 Self {
384 next_id: AtomicU64::new(1),
385 jobs: Arc::new(Mutex::new(HashMap::new())),
386 }
387 }
388
389 pub fn spawn<F>(&self, command: String, future: F) -> JobId
391 where
392 F: std::future::Future<Output = ExecResult> + Send + 'static,
393 {
394 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
395 let handle = tokio::spawn(future);
396 let job = Job::new(id, command, handle);
397
398 let jobs = self.jobs.clone();
400 if let Ok(mut guard) = jobs.try_lock() {
401 guard.insert(id, job);
402 } else {
403 tokio::spawn(async move {
404 let mut jobs = jobs.lock().await;
405 jobs.insert(id, job);
406 });
407 }
408
409 id
410 }
411
412 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
414 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
415 let job = Job::from_channel(id, command, rx);
416
417 let mut jobs = self.jobs.lock().await;
418 jobs.insert(id, job);
419
420 id
421 }
422
423 pub async fn register_with_streams(
427 &self,
428 command: String,
429 rx: oneshot::Receiver<ExecResult>,
430 stdout: Arc<BoundedStream>,
431 stderr: Arc<BoundedStream>,
432 ) -> JobId {
433 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
434 let job = Job::with_streams(id, command, rx, stdout, stderr);
435
436 let mut jobs = self.jobs.lock().await;
437 jobs.insert(id, job);
438
439 id
440 }
441
442 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
444 let mut jobs = self.jobs.lock().await;
445 if let Some(job) = jobs.get_mut(&id) {
446 Some(job.wait().await)
447 } else {
448 None
449 }
450 }
451
452 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
454 let mut results = Vec::new();
455
456 let ids: Vec<JobId> = {
458 let jobs = self.jobs.lock().await;
459 jobs.keys().copied().collect()
460 };
461
462 for id in ids {
463 if let Some(result) = self.wait(id).await {
464 results.push((id, result));
465 }
466 }
467
468 results
469 }
470
471 pub async fn list(&self) -> Vec<JobInfo> {
473 let mut jobs = self.jobs.lock().await;
474 jobs.values_mut()
475 .map(|job| JobInfo {
476 id: job.id,
477 command: job.command.clone(),
478 status: job.status(),
479 output_file: job.output_file.clone(),
480 pid: job.pid,
481 })
482 .collect()
483 }
484
485 pub async fn running_count(&self) -> usize {
487 let mut jobs = self.jobs.lock().await;
488 let mut count = 0;
489 for job in jobs.values_mut() {
490 if !job.is_done() {
491 count += 1;
492 }
493 }
494 count
495 }
496
497 pub async fn cleanup(&self) {
499 let mut jobs = self.jobs.lock().await;
500 jobs.retain(|_, job| {
501 if job.is_done() {
502 job.cleanup_files();
503 false
504 } else {
505 true
506 }
507 });
508 }
509
510 pub async fn exists(&self, id: JobId) -> bool {
512 let jobs = self.jobs.lock().await;
513 jobs.contains_key(&id)
514 }
515
516 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
518 let mut jobs = self.jobs.lock().await;
519 jobs.get_mut(&id).map(|job| JobInfo {
520 id: job.id,
521 command: job.command.clone(),
522 status: job.status(),
523 output_file: job.output_file.clone(),
524 pid: job.pid,
525 })
526 }
527
528 pub async fn get_command(&self, id: JobId) -> Option<String> {
530 let jobs = self.jobs.lock().await;
531 jobs.get(&id).map(|job| job.command.clone())
532 }
533
534 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
536 let mut jobs = self.jobs.lock().await;
537 jobs.get_mut(&id).map(|job| job.status_string())
538 }
539
540 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
544 let jobs = self.jobs.lock().await;
545 if let Some(job) = jobs.get(&id)
546 && let Some(stream) = job.stdout_stream() {
547 return Some(stream.read().await);
548 }
549 None
550 }
551
552 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
556 let jobs = self.jobs.lock().await;
557 if let Some(job) = jobs.get(&id)
558 && let Some(stream) = job.stderr_stream() {
559 return Some(stream.read().await);
560 }
561 None
562 }
563
564 pub async fn list_ids(&self) -> Vec<JobId> {
566 let jobs = self.jobs.lock().await;
567 jobs.keys().copied().collect()
568 }
569
570 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
572 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
573 let job = Job::stopped(id, command, pid, pgid);
574 let mut jobs = self.jobs.lock().await;
575 jobs.insert(id, job);
576 id
577 }
578
579 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
581 let mut jobs = self.jobs.lock().await;
582 if let Some(job) = jobs.get_mut(&id) {
583 job.stopped = true;
584 job.pid = Some(pid);
585 job.pgid = Some(pgid);
586 }
587 }
588
589 pub async fn resume_job(&self, id: JobId) {
591 let mut jobs = self.jobs.lock().await;
592 if let Some(job) = jobs.get_mut(&id) {
593 job.stopped = false;
594 }
595 }
596
597 pub async fn last_stopped(&self) -> Option<JobId> {
599 let mut jobs = self.jobs.lock().await;
600 let mut best: Option<JobId> = None;
602 for job in jobs.values_mut() {
603 if job.stopped {
604 match best {
605 None => best = Some(job.id),
606 Some(b) if job.id.0 > b.0 => best = Some(job.id),
607 _ => {}
608 }
609 }
610 }
611 best
612 }
613
614 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
616 let jobs = self.jobs.lock().await;
617 jobs.get(&id).and_then(|job| {
618 match (job.pid, job.pgid) {
619 (Some(pid), Some(pgid)) => Some((pid, pgid)),
620 _ => None,
621 }
622 })
623 }
624
625 pub async fn remove(&self, id: JobId) {
627 let mut jobs = self.jobs.lock().await;
628 if let Some(mut job) = jobs.remove(&id) {
629 job.cleanup_files();
630 }
631 }
632}
633
634impl Default for JobManager {
635 fn default() -> Self {
636 Self::new()
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use std::time::Duration;
644
645 #[tokio::test]
646 async fn test_spawn_and_wait() {
647 let manager = JobManager::new();
648
649 let id = manager.spawn("test".to_string(), async {
650 tokio::time::sleep(Duration::from_millis(10)).await;
651 ExecResult::success("done")
652 });
653
654 tokio::time::sleep(Duration::from_millis(5)).await;
656
657 let result = manager.wait(id).await;
658 assert!(result.is_some());
659 let result = result.unwrap();
660 assert!(result.ok());
661 assert_eq!(result.out, "done");
662 }
663
664 #[tokio::test]
665 async fn test_wait_all() {
666 let manager = JobManager::new();
667
668 manager.spawn("job1".to_string(), async {
669 tokio::time::sleep(Duration::from_millis(10)).await;
670 ExecResult::success("one")
671 });
672
673 manager.spawn("job2".to_string(), async {
674 tokio::time::sleep(Duration::from_millis(5)).await;
675 ExecResult::success("two")
676 });
677
678 tokio::time::sleep(Duration::from_millis(5)).await;
680
681 let results = manager.wait_all().await;
682 assert_eq!(results.len(), 2);
683 }
684
685 #[tokio::test]
686 async fn test_list_jobs() {
687 let manager = JobManager::new();
688
689 manager.spawn("test job".to_string(), async {
690 tokio::time::sleep(Duration::from_millis(50)).await;
691 ExecResult::success("")
692 });
693
694 tokio::time::sleep(Duration::from_millis(5)).await;
696
697 let jobs = manager.list().await;
698 assert_eq!(jobs.len(), 1);
699 assert_eq!(jobs[0].command, "test job");
700 assert_eq!(jobs[0].status, JobStatus::Running);
701 }
702
703 #[tokio::test]
704 async fn test_job_status_after_completion() {
705 let manager = JobManager::new();
706
707 let id = manager.spawn("quick".to_string(), async {
708 ExecResult::success("")
709 });
710
711 tokio::time::sleep(Duration::from_millis(10)).await;
713 let _ = manager.wait(id).await;
714
715 let info = manager.get(id).await;
716 assert!(info.is_some());
717 assert_eq!(info.unwrap().status, JobStatus::Done);
718 }
719
720 #[tokio::test]
721 async fn test_cleanup() {
722 let manager = JobManager::new();
723
724 let id = manager.spawn("done".to_string(), async {
725 ExecResult::success("")
726 });
727
728 tokio::time::sleep(Duration::from_millis(10)).await;
730 let _ = manager.wait(id).await;
731
732 assert_eq!(manager.list().await.len(), 1);
734
735 manager.cleanup().await;
737
738 assert_eq!(manager.list().await.len(), 0);
740 }
741
742 #[tokio::test]
743 async fn test_cleanup_removes_temp_files() {
744 let manager = JobManager::new();
746
747 let id = manager.spawn("output job".to_string(), async {
748 ExecResult::success("some output that gets written to a temp file")
749 });
750
751 tokio::time::sleep(Duration::from_millis(10)).await;
753 let result = manager.wait(id).await;
754 assert!(result.is_some());
755
756 let output_file = {
758 let jobs = manager.jobs.lock().await;
759 jobs.get(&id).and_then(|j| j.output_file().cloned())
760 };
761
762 manager.cleanup().await;
764
765 if let Some(path) = output_file {
767 assert!(
768 !path.exists(),
769 "temp file should be removed after cleanup: {}",
770 path.display()
771 );
772 }
773 }
774
775 #[tokio::test]
776 async fn test_register_with_channel() {
777 let manager = JobManager::new();
778 let (tx, rx) = oneshot::channel();
779
780 let id = manager.register("channel job".to_string(), rx).await;
781
782 tx.send(ExecResult::success("from channel")).unwrap();
784
785 let result = manager.wait(id).await;
786 assert!(result.is_some());
787 assert_eq!(result.unwrap().out, "from channel");
788 }
789
790 #[tokio::test]
791 async fn test_spawn_immediately_available() {
792 let manager = JobManager::new();
794
795 let id = manager.spawn("instant".to_string(), async {
796 tokio::time::sleep(Duration::from_millis(100)).await;
797 ExecResult::success("done")
798 });
799
800 let exists = manager.exists(id).await;
802 assert!(exists, "job should be immediately available after spawn()");
803
804 let info = manager.get(id).await;
805 assert!(info.is_some(), "job info should be available immediately");
806 }
807
808 #[tokio::test]
809 async fn test_nonexistent_job() {
810 let manager = JobManager::new();
811 let result = manager.wait(JobId(999)).await;
812 assert!(result.is_none());
813 }
814}