1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21pub struct Job {
23 pub id: JobId,
25 pub command: String,
27 handle: Option<JoinHandle<ExecResult>>,
29 result_rx: Option<oneshot::Receiver<ExecResult>>,
31 result: Option<ExecResult>,
33 output_file: Option<PathBuf>,
35 stdout_stream: Option<Arc<BoundedStream>>,
37 stderr_stream: Option<Arc<BoundedStream>>,
39 pid: Option<u32>,
41 pgid: Option<u32>,
43 stopped: bool,
45}
46
47impl Job {
48 pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
50 Self {
51 id,
52 command,
53 handle: Some(handle),
54 result_rx: None,
55 result: None,
56 output_file: None,
57 stdout_stream: None,
58 stderr_stream: None,
59 pid: None,
60 pgid: None,
61 stopped: false,
62 }
63 }
64
65 pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
67 Self {
68 id,
69 command,
70 handle: None,
71 result_rx: Some(rx),
72 result: None,
73 output_file: None,
74 stdout_stream: None,
75 stderr_stream: None,
76 pid: None,
77 pgid: None,
78 stopped: false,
79 }
80 }
81
82 pub fn with_streams(
86 id: JobId,
87 command: String,
88 rx: oneshot::Receiver<ExecResult>,
89 stdout: Arc<BoundedStream>,
90 stderr: Arc<BoundedStream>,
91 ) -> Self {
92 Self {
93 id,
94 command,
95 handle: None,
96 result_rx: Some(rx),
97 result: None,
98 output_file: None,
99 stdout_stream: Some(stdout),
100 stderr_stream: Some(stderr),
101 pid: None,
102 pgid: None,
103 stopped: false,
104 }
105 }
106
107 pub fn stopped(id: JobId, command: String, pid: u32, pgid: u32) -> Self {
109 Self {
110 id,
111 command,
112 handle: None,
113 result_rx: None,
114 result: None,
115 output_file: None,
116 stdout_stream: None,
117 stderr_stream: None,
118 pid: Some(pid),
119 pgid: Some(pgid),
120 stopped: true,
121 }
122 }
123
124 pub fn output_file(&self) -> Option<&PathBuf> {
126 self.output_file.as_ref()
127 }
128
129 pub fn is_done(&mut self) -> bool {
133 if self.stopped {
134 return false;
135 }
136 self.try_poll();
137 self.result.is_some()
138 }
139
140 pub fn status(&mut self) -> JobStatus {
142 if self.stopped {
143 return JobStatus::Stopped;
144 }
145 self.try_poll();
146 match &self.result {
147 Some(r) if r.ok() => JobStatus::Done,
148 Some(_) => JobStatus::Failed,
149 None => JobStatus::Running,
150 }
151 }
152
153 pub fn status_string(&mut self) -> String {
160 self.try_poll();
161 match &self.result {
162 Some(r) if r.ok() => "done:0".to_string(),
163 Some(r) => format!("failed:{}", r.code),
164 None => "running".to_string(),
165 }
166 }
167
168 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
170 self.stdout_stream.as_ref()
171 }
172
173 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
175 self.stderr_stream.as_ref()
176 }
177
178 pub async fn wait(&mut self) -> ExecResult {
182 if let Some(result) = self.result.take() {
183 self.result = Some(result.clone());
184 return result;
185 }
186
187 let result = if let Some(handle) = self.handle.take() {
188 match handle.await {
189 Ok(r) => r,
190 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
191 }
192 } else if let Some(rx) = self.result_rx.take() {
193 match rx.await {
194 Ok(r) => r,
195 Err(_) => ExecResult::failure(1, "job channel closed"),
196 }
197 } else {
198 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
200 };
201
202 if self.output_file.is_none()
204 && let Some(path) = self.write_output_file(&result) {
205 self.output_file = Some(path);
206 }
207
208 self.result = Some(result.clone());
209 result
210 }
211
212 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
214 let text = result.text_out();
216 if text.is_empty() && result.err.is_empty() {
217 return None;
218 }
219
220 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
221 if std::fs::create_dir_all(&tmp_dir).is_err() {
222 tracing::warn!("Failed to create job output directory");
223 return None;
224 }
225
226 let filename = format!("job_{}.txt", self.id.0);
227 let path = tmp_dir.join(filename);
228
229 let mut content = String::new();
230 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
231 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
232
233 if !text.is_empty() {
234 content.push_str("## STDOUT\n");
235 content.push_str(&text);
236 if !text.ends_with('\n') {
237 content.push('\n');
238 }
239 }
240
241 if !result.err.is_empty() {
242 content.push_str("\n## STDERR\n");
243 content.push_str(&result.err);
244 if !result.err.ends_with('\n') {
245 content.push('\n');
246 }
247 }
248
249 match std::fs::write(&path, content) {
250 Ok(()) => Some(path),
251 Err(e) => {
252 tracing::warn!("Failed to write job output file: {}", e);
253 None
254 }
255 }
256 }
257
258 pub fn cleanup_files(&mut self) {
260 if let Some(path) = self.output_file.take() {
261 if let Err(e) = std::fs::remove_file(&path) {
262 if e.kind() != io::ErrorKind::NotFound {
264 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
265 }
266 }
267 }
268 }
269
270 pub fn try_result(&self) -> Option<&ExecResult> {
272 self.result.as_ref()
273 }
274
275 pub fn try_poll(&mut self) -> bool {
280 if self.result.is_some() {
281 return true;
282 }
283
284 if let Some(rx) = self.result_rx.as_mut() {
286 match rx.try_recv() {
287 Ok(result) => {
288 self.result = Some(result);
289 self.result_rx = None;
290 return true;
291 }
292 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
293 return false;
295 }
296 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
297 self.result = Some(ExecResult::failure(1, "job channel closed"));
299 self.result_rx = None;
300 return true;
301 }
302 }
303 }
304
305 if let Some(handle) = self.handle.as_mut()
307 && handle.is_finished() {
308 let Some(mut handle) = self.handle.take() else {
310 return false;
311 };
312 let waker = std::task::Waker::noop();
314 let mut cx = std::task::Context::from_waker(waker);
315 let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
316 std::task::Poll::Ready(Ok(r)) => r,
317 std::task::Poll::Ready(Err(e)) => {
318 ExecResult::failure(1, format!("job panicked: {}", e))
319 }
320 std::task::Poll::Pending => return false, };
322 self.result = Some(result);
323 return true;
324 }
325
326 false
327 }
328}
329
330pub struct JobManager {
332 next_id: AtomicU64,
334 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
336}
337
338impl JobManager {
339 pub fn new() -> Self {
341 Self {
342 next_id: AtomicU64::new(1),
343 jobs: Arc::new(Mutex::new(HashMap::new())),
344 }
345 }
346
347 pub fn spawn<F>(&self, command: String, future: F) -> JobId
352 where
353 F: std::future::Future<Output = ExecResult> + Send + 'static,
354 {
355 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
356 let handle = tokio::spawn(future);
357 let job = Job::new(id, command, handle);
358
359 let mut job_opt = Some(job);
363 loop {
364 match self.jobs.try_lock() {
365 Ok(mut guard) => {
366 if let Some(j) = job_opt.take() {
367 guard.insert(id, j);
368 }
369 break;
370 }
371 Err(_) => {
372 std::hint::spin_loop();
373 }
374 }
375 }
376
377 id
378 }
379
380 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
382 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
383 let job = Job::from_channel(id, command, rx);
384
385 let mut jobs = self.jobs.lock().await;
386 jobs.insert(id, job);
387
388 id
389 }
390
391 pub async fn register_with_streams(
395 &self,
396 command: String,
397 rx: oneshot::Receiver<ExecResult>,
398 stdout: Arc<BoundedStream>,
399 stderr: Arc<BoundedStream>,
400 ) -> JobId {
401 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
402 let job = Job::with_streams(id, command, rx, stdout, stderr);
403
404 let mut jobs = self.jobs.lock().await;
405 jobs.insert(id, job);
406
407 id
408 }
409
410 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
412 let mut jobs = self.jobs.lock().await;
413 if let Some(job) = jobs.get_mut(&id) {
414 Some(job.wait().await)
415 } else {
416 None
417 }
418 }
419
420 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
422 let mut results = Vec::new();
423
424 let ids: Vec<JobId> = {
426 let jobs = self.jobs.lock().await;
427 jobs.keys().copied().collect()
428 };
429
430 for id in ids {
431 if let Some(result) = self.wait(id).await {
432 results.push((id, result));
433 }
434 }
435
436 results
437 }
438
439 pub async fn list(&self) -> Vec<JobInfo> {
441 let mut jobs = self.jobs.lock().await;
442 jobs.values_mut()
443 .map(|job| JobInfo {
444 id: job.id,
445 command: job.command.clone(),
446 status: job.status(),
447 output_file: job.output_file.clone(),
448 pid: job.pid,
449 })
450 .collect()
451 }
452
453 pub async fn running_count(&self) -> usize {
455 let mut jobs = self.jobs.lock().await;
456 let mut count = 0;
457 for job in jobs.values_mut() {
458 if !job.is_done() {
459 count += 1;
460 }
461 }
462 count
463 }
464
465 pub async fn cleanup(&self) {
467 let mut jobs = self.jobs.lock().await;
468 jobs.retain(|_, job| {
469 if job.is_done() {
470 job.cleanup_files();
471 false
472 } else {
473 true
474 }
475 });
476 }
477
478 pub async fn exists(&self, id: JobId) -> bool {
480 let jobs = self.jobs.lock().await;
481 jobs.contains_key(&id)
482 }
483
484 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
486 let mut jobs = self.jobs.lock().await;
487 jobs.get_mut(&id).map(|job| JobInfo {
488 id: job.id,
489 command: job.command.clone(),
490 status: job.status(),
491 output_file: job.output_file.clone(),
492 pid: job.pid,
493 })
494 }
495
496 pub async fn get_command(&self, id: JobId) -> Option<String> {
498 let jobs = self.jobs.lock().await;
499 jobs.get(&id).map(|job| job.command.clone())
500 }
501
502 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
504 let mut jobs = self.jobs.lock().await;
505 jobs.get_mut(&id).map(|job| job.status_string())
506 }
507
508 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
512 let jobs = self.jobs.lock().await;
513 if let Some(job) = jobs.get(&id)
514 && let Some(stream) = job.stdout_stream() {
515 return Some(stream.read().await);
516 }
517 None
518 }
519
520 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
524 let jobs = self.jobs.lock().await;
525 if let Some(job) = jobs.get(&id)
526 && let Some(stream) = job.stderr_stream() {
527 return Some(stream.read().await);
528 }
529 None
530 }
531
532 pub async fn list_ids(&self) -> Vec<JobId> {
534 let jobs = self.jobs.lock().await;
535 jobs.keys().copied().collect()
536 }
537
538 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
540 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
541 let job = Job::stopped(id, command, pid, pgid);
542 let mut jobs = self.jobs.lock().await;
543 jobs.insert(id, job);
544 id
545 }
546
547 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
549 let mut jobs = self.jobs.lock().await;
550 if let Some(job) = jobs.get_mut(&id) {
551 job.stopped = true;
552 job.pid = Some(pid);
553 job.pgid = Some(pgid);
554 }
555 }
556
557 pub async fn resume_job(&self, id: JobId) {
559 let mut jobs = self.jobs.lock().await;
560 if let Some(job) = jobs.get_mut(&id) {
561 job.stopped = false;
562 }
563 }
564
565 pub async fn last_stopped(&self) -> Option<JobId> {
567 let mut jobs = self.jobs.lock().await;
568 let mut best: Option<JobId> = None;
570 for job in jobs.values_mut() {
571 if job.stopped {
572 match best {
573 None => best = Some(job.id),
574 Some(b) if job.id.0 > b.0 => best = Some(job.id),
575 _ => {}
576 }
577 }
578 }
579 best
580 }
581
582 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
584 let jobs = self.jobs.lock().await;
585 jobs.get(&id).and_then(|job| {
586 match (job.pid, job.pgid) {
587 (Some(pid), Some(pgid)) => Some((pid, pgid)),
588 _ => None,
589 }
590 })
591 }
592
593 pub async fn remove(&self, id: JobId) {
595 let mut jobs = self.jobs.lock().await;
596 if let Some(mut job) = jobs.remove(&id) {
597 job.cleanup_files();
598 }
599 }
600}
601
602impl Default for JobManager {
603 fn default() -> Self {
604 Self::new()
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use std::time::Duration;
612
613 #[tokio::test]
614 async fn test_spawn_and_wait() {
615 let manager = JobManager::new();
616
617 let id = manager.spawn("test".to_string(), async {
618 tokio::time::sleep(Duration::from_millis(10)).await;
619 ExecResult::success("done")
620 });
621
622 tokio::time::sleep(Duration::from_millis(5)).await;
624
625 let result = manager.wait(id).await;
626 assert!(result.is_some());
627 let result = result.unwrap();
628 assert!(result.ok());
629 assert_eq!(result.out, "done");
630 }
631
632 #[tokio::test]
633 async fn test_wait_all() {
634 let manager = JobManager::new();
635
636 manager.spawn("job1".to_string(), async {
637 tokio::time::sleep(Duration::from_millis(10)).await;
638 ExecResult::success("one")
639 });
640
641 manager.spawn("job2".to_string(), async {
642 tokio::time::sleep(Duration::from_millis(5)).await;
643 ExecResult::success("two")
644 });
645
646 tokio::time::sleep(Duration::from_millis(5)).await;
648
649 let results = manager.wait_all().await;
650 assert_eq!(results.len(), 2);
651 }
652
653 #[tokio::test]
654 async fn test_list_jobs() {
655 let manager = JobManager::new();
656
657 manager.spawn("test job".to_string(), async {
658 tokio::time::sleep(Duration::from_millis(50)).await;
659 ExecResult::success("")
660 });
661
662 tokio::time::sleep(Duration::from_millis(5)).await;
664
665 let jobs = manager.list().await;
666 assert_eq!(jobs.len(), 1);
667 assert_eq!(jobs[0].command, "test job");
668 assert_eq!(jobs[0].status, JobStatus::Running);
669 }
670
671 #[tokio::test]
672 async fn test_job_status_after_completion() {
673 let manager = JobManager::new();
674
675 let id = manager.spawn("quick".to_string(), async {
676 ExecResult::success("")
677 });
678
679 tokio::time::sleep(Duration::from_millis(10)).await;
681 let _ = manager.wait(id).await;
682
683 let info = manager.get(id).await;
684 assert!(info.is_some());
685 assert_eq!(info.unwrap().status, JobStatus::Done);
686 }
687
688 #[tokio::test]
689 async fn test_cleanup() {
690 let manager = JobManager::new();
691
692 let id = manager.spawn("done".to_string(), async {
693 ExecResult::success("")
694 });
695
696 tokio::time::sleep(Duration::from_millis(10)).await;
698 let _ = manager.wait(id).await;
699
700 assert_eq!(manager.list().await.len(), 1);
702
703 manager.cleanup().await;
705
706 assert_eq!(manager.list().await.len(), 0);
708 }
709
710 #[tokio::test]
711 async fn test_cleanup_removes_temp_files() {
712 let manager = JobManager::new();
714
715 let id = manager.spawn("output job".to_string(), async {
716 ExecResult::success("some output that gets written to a temp file")
717 });
718
719 tokio::time::sleep(Duration::from_millis(10)).await;
721 let result = manager.wait(id).await;
722 assert!(result.is_some());
723
724 let output_file = {
726 let jobs = manager.jobs.lock().await;
727 jobs.get(&id).and_then(|j| j.output_file().cloned())
728 };
729
730 manager.cleanup().await;
732
733 if let Some(path) = output_file {
735 assert!(
736 !path.exists(),
737 "temp file should be removed after cleanup: {}",
738 path.display()
739 );
740 }
741 }
742
743 #[tokio::test]
744 async fn test_register_with_channel() {
745 let manager = JobManager::new();
746 let (tx, rx) = oneshot::channel();
747
748 let id = manager.register("channel job".to_string(), rx).await;
749
750 tx.send(ExecResult::success("from channel")).unwrap();
752
753 let result = manager.wait(id).await;
754 assert!(result.is_some());
755 assert_eq!(result.unwrap().out, "from channel");
756 }
757
758 #[tokio::test]
759 async fn test_spawn_immediately_available() {
760 let manager = JobManager::new();
762
763 let id = manager.spawn("instant".to_string(), async {
764 tokio::time::sleep(Duration::from_millis(100)).await;
765 ExecResult::success("done")
766 });
767
768 let exists = manager.exists(id).await;
770 assert!(exists, "job should be immediately available after spawn()");
771
772 let info = manager.get(id).await;
773 assert!(info.is_some(), "job info should be available immediately");
774 }
775
776 #[tokio::test]
777 async fn test_nonexistent_job() {
778 let manager = JobManager::new();
779 let result = manager.wait(JobId(999)).await;
780 assert!(result.is_none());
781 }
782}