1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21pub struct Job {
23 pub id: JobId,
25 pub command: String,
27 handle: Option<JoinHandle<ExecResult>>,
29 result_rx: Option<oneshot::Receiver<ExecResult>>,
31 result: Option<ExecResult>,
33 output_file: Option<PathBuf>,
35 stdout_stream: Option<Arc<BoundedStream>>,
37 stderr_stream: Option<Arc<BoundedStream>>,
39 pid: Option<u32>,
41 pgid: Option<u32>,
43 stopped: bool,
45}
46
47impl Job {
48 pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
50 Self {
51 id,
52 command,
53 handle: Some(handle),
54 result_rx: None,
55 result: None,
56 output_file: None,
57 stdout_stream: None,
58 stderr_stream: None,
59 pid: None,
60 pgid: None,
61 stopped: false,
62 }
63 }
64
65 pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
67 Self {
68 id,
69 command,
70 handle: None,
71 result_rx: Some(rx),
72 result: None,
73 output_file: None,
74 stdout_stream: None,
75 stderr_stream: None,
76 pid: None,
77 pgid: None,
78 stopped: false,
79 }
80 }
81
82 pub fn with_streams(
86 id: JobId,
87 command: String,
88 rx: oneshot::Receiver<ExecResult>,
89 stdout: Arc<BoundedStream>,
90 stderr: Arc<BoundedStream>,
91 ) -> Self {
92 Self {
93 id,
94 command,
95 handle: None,
96 result_rx: Some(rx),
97 result: None,
98 output_file: None,
99 stdout_stream: Some(stdout),
100 stderr_stream: Some(stderr),
101 pid: None,
102 pgid: None,
103 stopped: false,
104 }
105 }
106
107 pub fn stopped(id: JobId, command: String, pid: u32, pgid: u32) -> Self {
109 Self {
110 id,
111 command,
112 handle: None,
113 result_rx: None,
114 result: None,
115 output_file: None,
116 stdout_stream: None,
117 stderr_stream: None,
118 pid: Some(pid),
119 pgid: Some(pgid),
120 stopped: true,
121 }
122 }
123
124 pub fn output_file(&self) -> Option<&PathBuf> {
126 self.output_file.as_ref()
127 }
128
129 pub fn is_done(&mut self) -> bool {
133 if self.stopped {
134 return false;
135 }
136 self.try_poll();
137 self.result.is_some()
138 }
139
140 pub fn status(&mut self) -> JobStatus {
142 if self.stopped {
143 return JobStatus::Stopped;
144 }
145 self.try_poll();
146 match &self.result {
147 Some(r) if r.ok() => JobStatus::Done,
148 Some(_) => JobStatus::Failed,
149 None => JobStatus::Running,
150 }
151 }
152
153 pub fn status_string(&mut self) -> String {
160 self.try_poll();
161 match &self.result {
162 Some(r) if r.ok() => "done:0".to_string(),
163 Some(r) => format!("failed:{}", r.code),
164 None => "running".to_string(),
165 }
166 }
167
168 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
170 self.stdout_stream.as_ref()
171 }
172
173 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
175 self.stderr_stream.as_ref()
176 }
177
178 pub async fn wait(&mut self) -> ExecResult {
182 if let Some(result) = self.result.take() {
183 self.result = Some(result.clone());
184 return result;
185 }
186
187 let result = if let Some(handle) = self.handle.take() {
188 match handle.await {
189 Ok(r) => r,
190 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
191 }
192 } else if let Some(rx) = self.result_rx.take() {
193 match rx.await {
194 Ok(r) => r,
195 Err(_) => ExecResult::failure(1, "job channel closed"),
196 }
197 } else {
198 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
200 };
201
202 if self.output_file.is_none()
204 && let Some(path) = self.write_output_file(&result) {
205 self.output_file = Some(path);
206 }
207
208 self.result = Some(result.clone());
209 result
210 }
211
212 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
214 if result.out.is_empty() && result.err.is_empty() {
216 return None;
217 }
218
219 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
220 if std::fs::create_dir_all(&tmp_dir).is_err() {
221 tracing::warn!("Failed to create job output directory");
222 return None;
223 }
224
225 let filename = format!("job_{}.txt", self.id.0);
226 let path = tmp_dir.join(filename);
227
228 let mut content = String::new();
229 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
230 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
231
232 if !result.out.is_empty() {
233 content.push_str("## STDOUT\n");
234 content.push_str(&result.out);
235 if !result.out.ends_with('\n') {
236 content.push('\n');
237 }
238 }
239
240 if !result.err.is_empty() {
241 content.push_str("\n## STDERR\n");
242 content.push_str(&result.err);
243 if !result.err.ends_with('\n') {
244 content.push('\n');
245 }
246 }
247
248 match std::fs::write(&path, content) {
249 Ok(()) => Some(path),
250 Err(e) => {
251 tracing::warn!("Failed to write job output file: {}", e);
252 None
253 }
254 }
255 }
256
257 pub fn cleanup_files(&mut self) {
259 if let Some(path) = self.output_file.take() {
260 if let Err(e) = std::fs::remove_file(&path) {
261 if e.kind() != io::ErrorKind::NotFound {
263 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
264 }
265 }
266 }
267 }
268
269 pub fn try_result(&self) -> Option<&ExecResult> {
271 self.result.as_ref()
272 }
273
274 pub fn try_poll(&mut self) -> bool {
279 if self.result.is_some() {
280 return true;
281 }
282
283 if let Some(rx) = self.result_rx.as_mut() {
285 match rx.try_recv() {
286 Ok(result) => {
287 self.result = Some(result);
288 self.result_rx = None;
289 return true;
290 }
291 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
292 return false;
294 }
295 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
296 self.result = Some(ExecResult::failure(1, "job channel closed"));
298 self.result_rx = None;
299 return true;
300 }
301 }
302 }
303
304 if let Some(handle) = self.handle.as_mut()
306 && handle.is_finished() {
307 let Some(mut handle) = self.handle.take() else {
309 return false;
310 };
311 let waker = std::task::Waker::noop();
313 let mut cx = std::task::Context::from_waker(waker);
314 let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
315 std::task::Poll::Ready(Ok(r)) => r,
316 std::task::Poll::Ready(Err(e)) => {
317 ExecResult::failure(1, format!("job panicked: {}", e))
318 }
319 std::task::Poll::Pending => return false, };
321 self.result = Some(result);
322 return true;
323 }
324
325 false
326 }
327}
328
329pub struct JobManager {
331 next_id: AtomicU64,
333 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
335}
336
337impl JobManager {
338 pub fn new() -> Self {
340 Self {
341 next_id: AtomicU64::new(1),
342 jobs: Arc::new(Mutex::new(HashMap::new())),
343 }
344 }
345
346 pub fn spawn<F>(&self, command: String, future: F) -> JobId
351 where
352 F: std::future::Future<Output = ExecResult> + Send + 'static,
353 {
354 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
355 let handle = tokio::spawn(future);
356 let job = Job::new(id, command, handle);
357
358 let mut job_opt = Some(job);
362 loop {
363 match self.jobs.try_lock() {
364 Ok(mut guard) => {
365 if let Some(j) = job_opt.take() {
366 guard.insert(id, j);
367 }
368 break;
369 }
370 Err(_) => {
371 std::hint::spin_loop();
372 }
373 }
374 }
375
376 id
377 }
378
379 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
381 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
382 let job = Job::from_channel(id, command, rx);
383
384 let mut jobs = self.jobs.lock().await;
385 jobs.insert(id, job);
386
387 id
388 }
389
390 pub async fn register_with_streams(
394 &self,
395 command: String,
396 rx: oneshot::Receiver<ExecResult>,
397 stdout: Arc<BoundedStream>,
398 stderr: Arc<BoundedStream>,
399 ) -> JobId {
400 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
401 let job = Job::with_streams(id, command, rx, stdout, stderr);
402
403 let mut jobs = self.jobs.lock().await;
404 jobs.insert(id, job);
405
406 id
407 }
408
409 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
411 let mut jobs = self.jobs.lock().await;
412 if let Some(job) = jobs.get_mut(&id) {
413 Some(job.wait().await)
414 } else {
415 None
416 }
417 }
418
419 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
421 let mut results = Vec::new();
422
423 let ids: Vec<JobId> = {
425 let jobs = self.jobs.lock().await;
426 jobs.keys().copied().collect()
427 };
428
429 for id in ids {
430 if let Some(result) = self.wait(id).await {
431 results.push((id, result));
432 }
433 }
434
435 results
436 }
437
438 pub async fn list(&self) -> Vec<JobInfo> {
440 let mut jobs = self.jobs.lock().await;
441 jobs.values_mut()
442 .map(|job| JobInfo {
443 id: job.id,
444 command: job.command.clone(),
445 status: job.status(),
446 output_file: job.output_file.clone(),
447 pid: job.pid,
448 })
449 .collect()
450 }
451
452 pub async fn running_count(&self) -> usize {
454 let mut jobs = self.jobs.lock().await;
455 let mut count = 0;
456 for job in jobs.values_mut() {
457 if !job.is_done() {
458 count += 1;
459 }
460 }
461 count
462 }
463
464 pub async fn cleanup(&self) {
466 let mut jobs = self.jobs.lock().await;
467 jobs.retain(|_, job| {
468 if job.is_done() {
469 job.cleanup_files();
470 false
471 } else {
472 true
473 }
474 });
475 }
476
477 pub async fn exists(&self, id: JobId) -> bool {
479 let jobs = self.jobs.lock().await;
480 jobs.contains_key(&id)
481 }
482
483 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
485 let mut jobs = self.jobs.lock().await;
486 jobs.get_mut(&id).map(|job| JobInfo {
487 id: job.id,
488 command: job.command.clone(),
489 status: job.status(),
490 output_file: job.output_file.clone(),
491 pid: job.pid,
492 })
493 }
494
495 pub async fn get_command(&self, id: JobId) -> Option<String> {
497 let jobs = self.jobs.lock().await;
498 jobs.get(&id).map(|job| job.command.clone())
499 }
500
501 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
503 let mut jobs = self.jobs.lock().await;
504 jobs.get_mut(&id).map(|job| job.status_string())
505 }
506
507 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
511 let jobs = self.jobs.lock().await;
512 if let Some(job) = jobs.get(&id)
513 && let Some(stream) = job.stdout_stream() {
514 return Some(stream.read().await);
515 }
516 None
517 }
518
519 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
523 let jobs = self.jobs.lock().await;
524 if let Some(job) = jobs.get(&id)
525 && let Some(stream) = job.stderr_stream() {
526 return Some(stream.read().await);
527 }
528 None
529 }
530
531 pub async fn list_ids(&self) -> Vec<JobId> {
533 let jobs = self.jobs.lock().await;
534 jobs.keys().copied().collect()
535 }
536
537 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
539 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
540 let job = Job::stopped(id, command, pid, pgid);
541 let mut jobs = self.jobs.lock().await;
542 jobs.insert(id, job);
543 id
544 }
545
546 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
548 let mut jobs = self.jobs.lock().await;
549 if let Some(job) = jobs.get_mut(&id) {
550 job.stopped = true;
551 job.pid = Some(pid);
552 job.pgid = Some(pgid);
553 }
554 }
555
556 pub async fn resume_job(&self, id: JobId) {
558 let mut jobs = self.jobs.lock().await;
559 if let Some(job) = jobs.get_mut(&id) {
560 job.stopped = false;
561 }
562 }
563
564 pub async fn last_stopped(&self) -> Option<JobId> {
566 let mut jobs = self.jobs.lock().await;
567 let mut best: Option<JobId> = None;
569 for job in jobs.values_mut() {
570 if job.stopped {
571 match best {
572 None => best = Some(job.id),
573 Some(b) if job.id.0 > b.0 => best = Some(job.id),
574 _ => {}
575 }
576 }
577 }
578 best
579 }
580
581 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
583 let jobs = self.jobs.lock().await;
584 jobs.get(&id).and_then(|job| {
585 match (job.pid, job.pgid) {
586 (Some(pid), Some(pgid)) => Some((pid, pgid)),
587 _ => None,
588 }
589 })
590 }
591
592 pub async fn remove(&self, id: JobId) {
594 let mut jobs = self.jobs.lock().await;
595 if let Some(mut job) = jobs.remove(&id) {
596 job.cleanup_files();
597 }
598 }
599}
600
601impl Default for JobManager {
602 fn default() -> Self {
603 Self::new()
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610 use std::time::Duration;
611
612 #[tokio::test]
613 async fn test_spawn_and_wait() {
614 let manager = JobManager::new();
615
616 let id = manager.spawn("test".to_string(), async {
617 tokio::time::sleep(Duration::from_millis(10)).await;
618 ExecResult::success("done")
619 });
620
621 tokio::time::sleep(Duration::from_millis(5)).await;
623
624 let result = manager.wait(id).await;
625 assert!(result.is_some());
626 let result = result.unwrap();
627 assert!(result.ok());
628 assert_eq!(result.out, "done");
629 }
630
631 #[tokio::test]
632 async fn test_wait_all() {
633 let manager = JobManager::new();
634
635 manager.spawn("job1".to_string(), async {
636 tokio::time::sleep(Duration::from_millis(10)).await;
637 ExecResult::success("one")
638 });
639
640 manager.spawn("job2".to_string(), async {
641 tokio::time::sleep(Duration::from_millis(5)).await;
642 ExecResult::success("two")
643 });
644
645 tokio::time::sleep(Duration::from_millis(5)).await;
647
648 let results = manager.wait_all().await;
649 assert_eq!(results.len(), 2);
650 }
651
652 #[tokio::test]
653 async fn test_list_jobs() {
654 let manager = JobManager::new();
655
656 manager.spawn("test job".to_string(), async {
657 tokio::time::sleep(Duration::from_millis(50)).await;
658 ExecResult::success("")
659 });
660
661 tokio::time::sleep(Duration::from_millis(5)).await;
663
664 let jobs = manager.list().await;
665 assert_eq!(jobs.len(), 1);
666 assert_eq!(jobs[0].command, "test job");
667 assert_eq!(jobs[0].status, JobStatus::Running);
668 }
669
670 #[tokio::test]
671 async fn test_job_status_after_completion() {
672 let manager = JobManager::new();
673
674 let id = manager.spawn("quick".to_string(), async {
675 ExecResult::success("")
676 });
677
678 tokio::time::sleep(Duration::from_millis(10)).await;
680 let _ = manager.wait(id).await;
681
682 let info = manager.get(id).await;
683 assert!(info.is_some());
684 assert_eq!(info.unwrap().status, JobStatus::Done);
685 }
686
687 #[tokio::test]
688 async fn test_cleanup() {
689 let manager = JobManager::new();
690
691 let id = manager.spawn("done".to_string(), async {
692 ExecResult::success("")
693 });
694
695 tokio::time::sleep(Duration::from_millis(10)).await;
697 let _ = manager.wait(id).await;
698
699 assert_eq!(manager.list().await.len(), 1);
701
702 manager.cleanup().await;
704
705 assert_eq!(manager.list().await.len(), 0);
707 }
708
709 #[tokio::test]
710 async fn test_cleanup_removes_temp_files() {
711 let manager = JobManager::new();
713
714 let id = manager.spawn("output job".to_string(), async {
715 ExecResult::success("some output that gets written to a temp file")
716 });
717
718 tokio::time::sleep(Duration::from_millis(10)).await;
720 let result = manager.wait(id).await;
721 assert!(result.is_some());
722
723 let output_file = {
725 let jobs = manager.jobs.lock().await;
726 jobs.get(&id).and_then(|j| j.output_file().cloned())
727 };
728
729 manager.cleanup().await;
731
732 if let Some(path) = output_file {
734 assert!(
735 !path.exists(),
736 "temp file should be removed after cleanup: {}",
737 path.display()
738 );
739 }
740 }
741
742 #[tokio::test]
743 async fn test_register_with_channel() {
744 let manager = JobManager::new();
745 let (tx, rx) = oneshot::channel();
746
747 let id = manager.register("channel job".to_string(), rx).await;
748
749 tx.send(ExecResult::success("from channel")).unwrap();
751
752 let result = manager.wait(id).await;
753 assert!(result.is_some());
754 assert_eq!(result.unwrap().out, "from channel");
755 }
756
757 #[tokio::test]
758 async fn test_spawn_immediately_available() {
759 let manager = JobManager::new();
761
762 let id = manager.spawn("instant".to_string(), async {
763 tokio::time::sleep(Duration::from_millis(100)).await;
764 ExecResult::success("done")
765 });
766
767 let exists = manager.exists(id).await;
769 assert!(exists, "job should be immediately available after spawn()");
770
771 let info = manager.get(id).await;
772 assert!(info.is_some(), "job info should be available immediately");
773 }
774
775 #[tokio::test]
776 async fn test_nonexistent_job() {
777 let manager = JobManager::new();
778 let result = manager.wait(JobId(999)).await;
779 assert!(result.is_none());
780 }
781}