1use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use tokio::sync::{oneshot, Mutex};
11use tokio::task::JoinHandle;
12
13use super::stream::BoundedStream;
14use crate::interpreter::ExecResult;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct JobId(pub u64);
19
20impl std::fmt::Display for JobId {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 write!(f, "{}", self.0)
23 }
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum JobStatus {
29 Running,
31 Done,
33 Failed,
35}
36
37impl std::fmt::Display for JobStatus {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 JobStatus::Running => write!(f, "Running"),
41 JobStatus::Done => write!(f, "Done"),
42 JobStatus::Failed => write!(f, "Failed"),
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct JobInfo {
50 pub id: JobId,
52 pub command: String,
54 pub status: JobStatus,
56 pub output_file: Option<PathBuf>,
58}
59
60pub struct Job {
62 pub id: JobId,
64 pub command: String,
66 handle: Option<JoinHandle<ExecResult>>,
68 result_rx: Option<oneshot::Receiver<ExecResult>>,
70 result: Option<ExecResult>,
72 output_file: Option<PathBuf>,
74 stdout_stream: Option<Arc<BoundedStream>>,
76 stderr_stream: Option<Arc<BoundedStream>>,
78}
79
80impl Job {
81 pub fn new(id: JobId, command: String, handle: JoinHandle<ExecResult>) -> Self {
83 Self {
84 id,
85 command,
86 handle: Some(handle),
87 result_rx: None,
88 result: None,
89 output_file: None,
90 stdout_stream: None,
91 stderr_stream: None,
92 }
93 }
94
95 pub fn from_channel(id: JobId, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
97 Self {
98 id,
99 command,
100 handle: None,
101 result_rx: Some(rx),
102 result: None,
103 output_file: None,
104 stdout_stream: None,
105 stderr_stream: None,
106 }
107 }
108
109 pub fn with_streams(
113 id: JobId,
114 command: String,
115 rx: oneshot::Receiver<ExecResult>,
116 stdout: Arc<BoundedStream>,
117 stderr: Arc<BoundedStream>,
118 ) -> Self {
119 Self {
120 id,
121 command,
122 handle: None,
123 result_rx: Some(rx),
124 result: None,
125 output_file: None,
126 stdout_stream: Some(stdout),
127 stderr_stream: Some(stderr),
128 }
129 }
130
131 pub fn output_file(&self) -> Option<&PathBuf> {
133 self.output_file.as_ref()
134 }
135
136 pub fn is_done(&mut self) -> bool {
138 self.try_poll();
139 self.result.is_some()
140 }
141
142 pub fn status(&mut self) -> JobStatus {
144 self.try_poll();
145 match &self.result {
146 Some(r) if r.ok() => JobStatus::Done,
147 Some(_) => JobStatus::Failed,
148 None => JobStatus::Running,
149 }
150 }
151
152 pub fn status_string(&mut self) -> String {
159 self.try_poll();
160 match &self.result {
161 Some(r) if r.ok() => "done:0".to_string(),
162 Some(r) => format!("failed:{}", r.code),
163 None => "running".to_string(),
164 }
165 }
166
167 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
169 self.stdout_stream.as_ref()
170 }
171
172 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
174 self.stderr_stream.as_ref()
175 }
176
177 pub async fn wait(&mut self) -> ExecResult {
181 if let Some(result) = self.result.take() {
182 self.result = Some(result.clone());
183 return result;
184 }
185
186 let result = if let Some(handle) = self.handle.take() {
187 match handle.await {
188 Ok(r) => r,
189 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
190 }
191 } else if let Some(rx) = self.result_rx.take() {
192 match rx.await {
193 Ok(r) => r,
194 Err(_) => ExecResult::failure(1, "job channel closed"),
195 }
196 } else {
197 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
199 };
200
201 if self.output_file.is_none()
203 && let Some(path) = self.write_output_file(&result) {
204 self.output_file = Some(path);
205 }
206
207 self.result = Some(result.clone());
208 result
209 }
210
211 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
213 if result.out.is_empty() && result.err.is_empty() {
215 return None;
216 }
217
218 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
219 if std::fs::create_dir_all(&tmp_dir).is_err() {
220 tracing::warn!("Failed to create job output directory");
221 return None;
222 }
223
224 let filename = format!("job_{}.txt", self.id.0);
225 let path = tmp_dir.join(filename);
226
227 let mut content = String::new();
228 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
229 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
230
231 if !result.out.is_empty() {
232 content.push_str("## STDOUT\n");
233 content.push_str(&result.out);
234 if !result.out.ends_with('\n') {
235 content.push('\n');
236 }
237 }
238
239 if !result.err.is_empty() {
240 content.push_str("\n## STDERR\n");
241 content.push_str(&result.err);
242 if !result.err.ends_with('\n') {
243 content.push('\n');
244 }
245 }
246
247 match std::fs::write(&path, content) {
248 Ok(()) => Some(path),
249 Err(e) => {
250 tracing::warn!("Failed to write job output file: {}", e);
251 None
252 }
253 }
254 }
255
256 pub fn try_result(&self) -> Option<&ExecResult> {
258 self.result.as_ref()
259 }
260
261 pub fn try_poll(&mut self) -> bool {
266 if self.result.is_some() {
267 return true;
268 }
269
270 if let Some(rx) = self.result_rx.as_mut() {
272 match rx.try_recv() {
273 Ok(result) => {
274 self.result = Some(result);
275 self.result_rx = None;
276 return true;
277 }
278 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
279 return false;
281 }
282 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
283 self.result = Some(ExecResult::failure(1, "job channel closed"));
285 self.result_rx = None;
286 return true;
287 }
288 }
289 }
290
291 if let Some(handle) = self.handle.as_mut()
293 && handle.is_finished() {
294 let Some(handle) = self.handle.take() else {
296 return false;
297 };
298 let result = match tokio::task::block_in_place(|| {
301 tokio::runtime::Handle::current().block_on(handle)
302 }) {
303 Ok(r) => r,
304 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
305 };
306 self.result = Some(result);
307 return true;
308 }
309
310 false
311 }
312}
313
314pub struct JobManager {
316 next_id: AtomicU64,
318 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
320}
321
322impl JobManager {
323 pub fn new() -> Self {
325 Self {
326 next_id: AtomicU64::new(1),
327 jobs: Arc::new(Mutex::new(HashMap::new())),
328 }
329 }
330
331 pub fn spawn<F>(&self, command: String, future: F) -> JobId
333 where
334 F: std::future::Future<Output = ExecResult> + Send + 'static,
335 {
336 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
337 let handle = tokio::spawn(future);
338 let job = Job::new(id, command, handle);
339
340 let jobs = self.jobs.clone();
342 tokio::spawn(async move {
343 let mut jobs = jobs.lock().await;
344 jobs.insert(id, job);
345 });
346
347 id
348 }
349
350 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
352 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
353 let job = Job::from_channel(id, command, rx);
354
355 let mut jobs = self.jobs.lock().await;
356 jobs.insert(id, job);
357
358 id
359 }
360
361 pub async fn register_with_streams(
365 &self,
366 command: String,
367 rx: oneshot::Receiver<ExecResult>,
368 stdout: Arc<BoundedStream>,
369 stderr: Arc<BoundedStream>,
370 ) -> JobId {
371 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
372 let job = Job::with_streams(id, command, rx, stdout, stderr);
373
374 let mut jobs = self.jobs.lock().await;
375 jobs.insert(id, job);
376
377 id
378 }
379
380 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
382 let mut jobs = self.jobs.lock().await;
383 if let Some(job) = jobs.get_mut(&id) {
384 Some(job.wait().await)
385 } else {
386 None
387 }
388 }
389
390 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
392 let mut results = Vec::new();
393
394 let ids: Vec<JobId> = {
396 let jobs = self.jobs.lock().await;
397 jobs.keys().copied().collect()
398 };
399
400 for id in ids {
401 if let Some(result) = self.wait(id).await {
402 results.push((id, result));
403 }
404 }
405
406 results
407 }
408
409 pub async fn list(&self) -> Vec<JobInfo> {
411 let mut jobs = self.jobs.lock().await;
412 jobs.values_mut()
413 .map(|job| JobInfo {
414 id: job.id,
415 command: job.command.clone(),
416 status: job.status(),
417 output_file: job.output_file.clone(),
418 })
419 .collect()
420 }
421
422 pub async fn running_count(&self) -> usize {
424 let mut jobs = self.jobs.lock().await;
425 let mut count = 0;
426 for job in jobs.values_mut() {
427 if !job.is_done() {
428 count += 1;
429 }
430 }
431 count
432 }
433
434 pub async fn cleanup(&self) {
436 let mut jobs = self.jobs.lock().await;
437 jobs.retain(|_, job| !job.is_done());
438 }
439
440 pub async fn exists(&self, id: JobId) -> bool {
442 let jobs = self.jobs.lock().await;
443 jobs.contains_key(&id)
444 }
445
446 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
448 let mut jobs = self.jobs.lock().await;
449 jobs.get_mut(&id).map(|job| JobInfo {
450 id: job.id,
451 command: job.command.clone(),
452 status: job.status(),
453 output_file: job.output_file.clone(),
454 })
455 }
456
457 pub async fn get_command(&self, id: JobId) -> Option<String> {
459 let jobs = self.jobs.lock().await;
460 jobs.get(&id).map(|job| job.command.clone())
461 }
462
463 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
465 let mut jobs = self.jobs.lock().await;
466 jobs.get_mut(&id).map(|job| job.status_string())
467 }
468
469 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
473 let jobs = self.jobs.lock().await;
474 if let Some(job) = jobs.get(&id)
475 && let Some(stream) = job.stdout_stream() {
476 return Some(stream.read().await);
477 }
478 None
479 }
480
481 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
485 let jobs = self.jobs.lock().await;
486 if let Some(job) = jobs.get(&id)
487 && let Some(stream) = job.stderr_stream() {
488 return Some(stream.read().await);
489 }
490 None
491 }
492
493 pub async fn list_ids(&self) -> Vec<JobId> {
495 let jobs = self.jobs.lock().await;
496 jobs.keys().copied().collect()
497 }
498}
499
500impl Default for JobManager {
501 fn default() -> Self {
502 Self::new()
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use super::*;
509 use std::time::Duration;
510
511 #[tokio::test]
512 async fn test_spawn_and_wait() {
513 let manager = JobManager::new();
514
515 let id = manager.spawn("test".to_string(), async {
516 tokio::time::sleep(Duration::from_millis(10)).await;
517 ExecResult::success("done")
518 });
519
520 tokio::time::sleep(Duration::from_millis(5)).await;
522
523 let result = manager.wait(id).await;
524 assert!(result.is_some());
525 let result = result.unwrap();
526 assert!(result.ok());
527 assert_eq!(result.out, "done");
528 }
529
530 #[tokio::test]
531 async fn test_wait_all() {
532 let manager = JobManager::new();
533
534 manager.spawn("job1".to_string(), async {
535 tokio::time::sleep(Duration::from_millis(10)).await;
536 ExecResult::success("one")
537 });
538
539 manager.spawn("job2".to_string(), async {
540 tokio::time::sleep(Duration::from_millis(5)).await;
541 ExecResult::success("two")
542 });
543
544 tokio::time::sleep(Duration::from_millis(5)).await;
546
547 let results = manager.wait_all().await;
548 assert_eq!(results.len(), 2);
549 }
550
551 #[tokio::test]
552 async fn test_list_jobs() {
553 let manager = JobManager::new();
554
555 manager.spawn("test job".to_string(), async {
556 tokio::time::sleep(Duration::from_millis(50)).await;
557 ExecResult::success("")
558 });
559
560 tokio::time::sleep(Duration::from_millis(5)).await;
562
563 let jobs = manager.list().await;
564 assert_eq!(jobs.len(), 1);
565 assert_eq!(jobs[0].command, "test job");
566 assert_eq!(jobs[0].status, JobStatus::Running);
567 }
568
569 #[tokio::test]
570 async fn test_job_status_after_completion() {
571 let manager = JobManager::new();
572
573 let id = manager.spawn("quick".to_string(), async {
574 ExecResult::success("")
575 });
576
577 tokio::time::sleep(Duration::from_millis(10)).await;
579 let _ = manager.wait(id).await;
580
581 let info = manager.get(id).await;
582 assert!(info.is_some());
583 assert_eq!(info.unwrap().status, JobStatus::Done);
584 }
585
586 #[tokio::test]
587 async fn test_cleanup() {
588 let manager = JobManager::new();
589
590 let id = manager.spawn("done".to_string(), async {
591 ExecResult::success("")
592 });
593
594 tokio::time::sleep(Duration::from_millis(10)).await;
596 let _ = manager.wait(id).await;
597
598 assert_eq!(manager.list().await.len(), 1);
600
601 manager.cleanup().await;
603
604 assert_eq!(manager.list().await.len(), 0);
606 }
607
608 #[tokio::test]
609 async fn test_register_with_channel() {
610 let manager = JobManager::new();
611 let (tx, rx) = oneshot::channel();
612
613 let id = manager.register("channel job".to_string(), rx).await;
614
615 tx.send(ExecResult::success("from channel")).unwrap();
617
618 let result = manager.wait(id).await;
619 assert!(result.is_some());
620 assert_eq!(result.unwrap().out, "from channel");
621 }
622
623 #[tokio::test]
624 async fn test_nonexistent_job() {
625 let manager = JobManager::new();
626 let result = manager.wait(JobId(999)).await;
627 assert!(result.is_none());
628 }
629}