1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use tokio::sync::{oneshot, Mutex};
12use tokio::task::JoinHandle;
13
14use super::stream::BoundedStream;
15use crate::interpreter::ExecResult;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub struct JobId(pub u64);
20
21impl std::fmt::Display for JobId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "{}", self.0)
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum JobStatus {
30 Running,
32 Done,
34 Failed,
36}
37
38impl std::fmt::Display for JobStatus {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 JobStatus::Running => write!(f, "Running"),
42 JobStatus::Done => write!(f, "Done"),
43 JobStatus::Failed => write!(f, "Failed"),
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub struct JobInfo {
51 pub id: JobId,
53 pub command: String,
55 pub status: JobStatus,
57 pub output_file: Option<PathBuf>,
59}
60
61pub struct Job {
63 pub id: JobId,
65 pub command: String,
67 handle: Option<JoinHandle<ExecResult>>,
69 result_rx: Option<oneshot::Receiver<ExecResult>>,
71 result: Option<ExecResult>,
73 output_file: Option<PathBuf>,
75 stdout_stream: Option<Arc<BoundedStream>>,
77 stderr_stream: Option<Arc<BoundedStream>>,
79}
80
81impl Job {
82 pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
84 Self {
85 id,
86 command,
87 handle: Some(handle),
88 result_rx: None,
89 result: None,
90 output_file: None,
91 stdout_stream: None,
92 stderr_stream: None,
93 }
94 }
95
96 pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
98 Self {
99 id,
100 command,
101 handle: None,
102 result_rx: Some(rx),
103 result: None,
104 output_file: None,
105 stdout_stream: None,
106 stderr_stream: None,
107 }
108 }
109
110 pub fn with_streams(
114 id: JobId,
115 command: String,
116 rx: oneshot::Receiver<ExecResult>,
117 stdout: Arc<BoundedStream>,
118 stderr: Arc<BoundedStream>,
119 ) -> Self {
120 Self {
121 id,
122 command,
123 handle: None,
124 result_rx: Some(rx),
125 result: None,
126 output_file: None,
127 stdout_stream: Some(stdout),
128 stderr_stream: Some(stderr),
129 }
130 }
131
132 pub fn output_file(&self) -> Option<&PathBuf> {
134 self.output_file.as_ref()
135 }
136
137 pub fn is_done(&mut self) -> bool {
139 self.try_poll();
140 self.result.is_some()
141 }
142
143 pub fn status(&mut self) -> JobStatus {
145 self.try_poll();
146 match &self.result {
147 Some(r) if r.ok() => JobStatus::Done,
148 Some(_) => JobStatus::Failed,
149 None => JobStatus::Running,
150 }
151 }
152
153 pub fn status_string(&mut self) -> String {
160 self.try_poll();
161 match &self.result {
162 Some(r) if r.ok() => "done:0".to_string(),
163 Some(r) => format!("failed:{}", r.code),
164 None => "running".to_string(),
165 }
166 }
167
168 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
170 self.stdout_stream.as_ref()
171 }
172
173 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
175 self.stderr_stream.as_ref()
176 }
177
178 pub async fn wait(&mut self) -> ExecResult {
182 if let Some(result) = self.result.take() {
183 self.result = Some(result.clone());
184 return result;
185 }
186
187 let result = if let Some(handle) = self.handle.take() {
188 match handle.await {
189 Ok(r) => r,
190 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
191 }
192 } else if let Some(rx) = self.result_rx.take() {
193 match rx.await {
194 Ok(r) => r,
195 Err(_) => ExecResult::failure(1, "job channel closed"),
196 }
197 } else {
198 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
200 };
201
202 if self.output_file.is_none()
204 && let Some(path) = self.write_output_file(&result) {
205 self.output_file = Some(path);
206 }
207
208 self.result = Some(result.clone());
209 result
210 }
211
212 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
214 if result.out.is_empty() && result.err.is_empty() {
216 return None;
217 }
218
219 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
220 if std::fs::create_dir_all(&tmp_dir).is_err() {
221 tracing::warn!("Failed to create job output directory");
222 return None;
223 }
224
225 let filename = format!("job_{}.txt", self.id.0);
226 let path = tmp_dir.join(filename);
227
228 let mut content = String::new();
229 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
230 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
231
232 if !result.out.is_empty() {
233 content.push_str("## STDOUT\n");
234 content.push_str(&result.out);
235 if !result.out.ends_with('\n') {
236 content.push('\n');
237 }
238 }
239
240 if !result.err.is_empty() {
241 content.push_str("\n## STDERR\n");
242 content.push_str(&result.err);
243 if !result.err.ends_with('\n') {
244 content.push('\n');
245 }
246 }
247
248 match std::fs::write(&path, content) {
249 Ok(()) => Some(path),
250 Err(e) => {
251 tracing::warn!("Failed to write job output file: {}", e);
252 None
253 }
254 }
255 }
256
257 pub fn cleanup_files(&mut self) {
259 if let Some(path) = self.output_file.take() {
260 if let Err(e) = std::fs::remove_file(&path) {
261 if e.kind() != io::ErrorKind::NotFound {
263 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
264 }
265 }
266 }
267 }
268
269 pub fn try_result(&self) -> Option<&ExecResult> {
271 self.result.as_ref()
272 }
273
274 pub fn try_poll(&mut self) -> bool {
279 if self.result.is_some() {
280 return true;
281 }
282
283 if let Some(rx) = self.result_rx.as_mut() {
285 match rx.try_recv() {
286 Ok(result) => {
287 self.result = Some(result);
288 self.result_rx = None;
289 return true;
290 }
291 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
292 return false;
294 }
295 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
296 self.result = Some(ExecResult::failure(1, "job channel closed"));
298 self.result_rx = None;
299 return true;
300 }
301 }
302 }
303
304 if let Some(handle) = self.handle.as_mut()
306 && handle.is_finished() {
307 let Some(handle) = self.handle.take() else {
309 return false;
310 };
311 let result = match tokio::task::block_in_place(|| {
314 tokio::runtime::Handle::current().block_on(handle)
315 }) {
316 Ok(r) => r,
317 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
318 };
319 self.result = Some(result);
320 return true;
321 }
322
323 false
324 }
325}
326
327pub struct JobManager {
329 next_id: AtomicU64,
331 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
333}
334
335impl JobManager {
336 pub fn new() -> Self {
338 Self {
339 next_id: AtomicU64::new(1),
340 jobs: Arc::new(Mutex::new(HashMap::new())),
341 }
342 }
343
344 pub fn spawn<F>(&self, command: String, future: F) -> JobId
346 where
347 F: std::future::Future<Output = ExecResult> + Send + 'static,
348 {
349 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
350 let handle = tokio::spawn(future);
351 let job = Job::new(id, command, handle);
352
353 let jobs = self.jobs.clone();
355 if let Ok(mut guard) = jobs.try_lock() {
356 guard.insert(id, job);
357 } else {
358 tokio::spawn(async move {
359 let mut jobs = jobs.lock().await;
360 jobs.insert(id, job);
361 });
362 }
363
364 id
365 }
366
367 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
369 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
370 let job = Job::from_channel(id, command, rx);
371
372 let mut jobs = self.jobs.lock().await;
373 jobs.insert(id, job);
374
375 id
376 }
377
378 pub async fn register_with_streams(
382 &self,
383 command: String,
384 rx: oneshot::Receiver<ExecResult>,
385 stdout: Arc<BoundedStream>,
386 stderr: Arc<BoundedStream>,
387 ) -> JobId {
388 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
389 let job = Job::with_streams(id, command, rx, stdout, stderr);
390
391 let mut jobs = self.jobs.lock().await;
392 jobs.insert(id, job);
393
394 id
395 }
396
397 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
399 let mut jobs = self.jobs.lock().await;
400 if let Some(job) = jobs.get_mut(&id) {
401 Some(job.wait().await)
402 } else {
403 None
404 }
405 }
406
407 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
409 let mut results = Vec::new();
410
411 let ids: Vec<JobId> = {
413 let jobs = self.jobs.lock().await;
414 jobs.keys().copied().collect()
415 };
416
417 for id in ids {
418 if let Some(result) = self.wait(id).await {
419 results.push((id, result));
420 }
421 }
422
423 results
424 }
425
426 pub async fn list(&self) -> Vec<JobInfo> {
428 let mut jobs = self.jobs.lock().await;
429 jobs.values_mut()
430 .map(|job| JobInfo {
431 id: job.id,
432 command: job.command.clone(),
433 status: job.status(),
434 output_file: job.output_file.clone(),
435 })
436 .collect()
437 }
438
439 pub async fn running_count(&self) -> usize {
441 let mut jobs = self.jobs.lock().await;
442 let mut count = 0;
443 for job in jobs.values_mut() {
444 if !job.is_done() {
445 count += 1;
446 }
447 }
448 count
449 }
450
451 pub async fn cleanup(&self) {
453 let mut jobs = self.jobs.lock().await;
454 jobs.retain(|_, job| {
455 if job.is_done() {
456 job.cleanup_files();
457 false
458 } else {
459 true
460 }
461 });
462 }
463
464 pub async fn exists(&self, id: JobId) -> bool {
466 let jobs = self.jobs.lock().await;
467 jobs.contains_key(&id)
468 }
469
470 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
472 let mut jobs = self.jobs.lock().await;
473 jobs.get_mut(&id).map(|job| JobInfo {
474 id: job.id,
475 command: job.command.clone(),
476 status: job.status(),
477 output_file: job.output_file.clone(),
478 })
479 }
480
481 pub async fn get_command(&self, id: JobId) -> Option<String> {
483 let jobs = self.jobs.lock().await;
484 jobs.get(&id).map(|job| job.command.clone())
485 }
486
487 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
489 let mut jobs = self.jobs.lock().await;
490 jobs.get_mut(&id).map(|job| job.status_string())
491 }
492
493 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
497 let jobs = self.jobs.lock().await;
498 if let Some(job) = jobs.get(&id)
499 && let Some(stream) = job.stdout_stream() {
500 return Some(stream.read().await);
501 }
502 None
503 }
504
505 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
509 let jobs = self.jobs.lock().await;
510 if let Some(job) = jobs.get(&id)
511 && let Some(stream) = job.stderr_stream() {
512 return Some(stream.read().await);
513 }
514 None
515 }
516
517 pub async fn list_ids(&self) -> Vec<JobId> {
519 let jobs = self.jobs.lock().await;
520 jobs.keys().copied().collect()
521 }
522}
523
524impl Default for JobManager {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use std::time::Duration;
534
535 #[tokio::test]
536 async fn test_spawn_and_wait() {
537 let manager = JobManager::new();
538
539 let id = manager.spawn("test".to_string(), async {
540 tokio::time::sleep(Duration::from_millis(10)).await;
541 ExecResult::success("done")
542 });
543
544 tokio::time::sleep(Duration::from_millis(5)).await;
546
547 let result = manager.wait(id).await;
548 assert!(result.is_some());
549 let result = result.unwrap();
550 assert!(result.ok());
551 assert_eq!(result.out, "done");
552 }
553
554 #[tokio::test]
555 async fn test_wait_all() {
556 let manager = JobManager::new();
557
558 manager.spawn("job1".to_string(), async {
559 tokio::time::sleep(Duration::from_millis(10)).await;
560 ExecResult::success("one")
561 });
562
563 manager.spawn("job2".to_string(), async {
564 tokio::time::sleep(Duration::from_millis(5)).await;
565 ExecResult::success("two")
566 });
567
568 tokio::time::sleep(Duration::from_millis(5)).await;
570
571 let results = manager.wait_all().await;
572 assert_eq!(results.len(), 2);
573 }
574
575 #[tokio::test]
576 async fn test_list_jobs() {
577 let manager = JobManager::new();
578
579 manager.spawn("test job".to_string(), async {
580 tokio::time::sleep(Duration::from_millis(50)).await;
581 ExecResult::success("")
582 });
583
584 tokio::time::sleep(Duration::from_millis(5)).await;
586
587 let jobs = manager.list().await;
588 assert_eq!(jobs.len(), 1);
589 assert_eq!(jobs[0].command, "test job");
590 assert_eq!(jobs[0].status, JobStatus::Running);
591 }
592
593 #[tokio::test]
594 async fn test_job_status_after_completion() {
595 let manager = JobManager::new();
596
597 let id = manager.spawn("quick".to_string(), async {
598 ExecResult::success("")
599 });
600
601 tokio::time::sleep(Duration::from_millis(10)).await;
603 let _ = manager.wait(id).await;
604
605 let info = manager.get(id).await;
606 assert!(info.is_some());
607 assert_eq!(info.unwrap().status, JobStatus::Done);
608 }
609
610 #[tokio::test]
611 async fn test_cleanup() {
612 let manager = JobManager::new();
613
614 let id = manager.spawn("done".to_string(), async {
615 ExecResult::success("")
616 });
617
618 tokio::time::sleep(Duration::from_millis(10)).await;
620 let _ = manager.wait(id).await;
621
622 assert_eq!(manager.list().await.len(), 1);
624
625 manager.cleanup().await;
627
628 assert_eq!(manager.list().await.len(), 0);
630 }
631
632 #[tokio::test]
633 async fn test_cleanup_removes_temp_files() {
634 let manager = JobManager::new();
636
637 let id = manager.spawn("output job".to_string(), async {
638 ExecResult::success("some output that gets written to a temp file")
639 });
640
641 tokio::time::sleep(Duration::from_millis(10)).await;
643 let result = manager.wait(id).await;
644 assert!(result.is_some());
645
646 let output_file = {
648 let jobs = manager.jobs.lock().await;
649 jobs.get(&id).and_then(|j| j.output_file().cloned())
650 };
651
652 manager.cleanup().await;
654
655 if let Some(path) = output_file {
657 assert!(
658 !path.exists(),
659 "temp file should be removed after cleanup: {}",
660 path.display()
661 );
662 }
663 }
664
665 #[tokio::test]
666 async fn test_register_with_channel() {
667 let manager = JobManager::new();
668 let (tx, rx) = oneshot::channel();
669
670 let id = manager.register("channel job".to_string(), rx).await;
671
672 tx.send(ExecResult::success("from channel")).unwrap();
674
675 let result = manager.wait(id).await;
676 assert!(result.is_some());
677 assert_eq!(result.unwrap().out, "from channel");
678 }
679
680 #[tokio::test]
681 async fn test_spawn_immediately_available() {
682 let manager = JobManager::new();
684
685 let id = manager.spawn("instant".to_string(), async {
686 tokio::time::sleep(Duration::from_millis(100)).await;
687 ExecResult::success("done")
688 });
689
690 let exists = manager.exists(id).await;
692 assert!(exists, "job should be immediately available after spawn()");
693
694 let info = manager.get(id).await;
695 assert!(info.is_some(), "job info should be available immediately");
696 }
697
698 #[tokio::test]
699 async fn test_nonexistent_job() {
700 let manager = JobManager::new();
701 let result = manager.wait(JobId(999)).await;
702 assert!(result.is_none());
703 }
704}