1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21pub struct Job {
23 pub id: JobId,
25 session_id: u64,
29 pub command: String,
31 handle: Option<JoinHandle<ExecResult>>,
33 result_rx: Option<oneshot::Receiver<ExecResult>>,
35 result: Option<ExecResult>,
37 output_file: Option<PathBuf>,
39 persist_output: bool,
47 stdout_stream: Option<Arc<BoundedStream>>,
49 stderr_stream: Option<Arc<BoundedStream>>,
51 pid: Option<u32>,
53 pgid: Option<u32>,
55 stopped: bool,
57 cancel: Option<tokio_util::sync::CancellationToken>,
63 pgids: Vec<u32>,
68}
69
70impl Job {
71 pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
73 Self {
74 id,
75 session_id,
76 command,
77 handle: Some(handle),
78 result_rx: None,
79 result: None,
80 output_file: None,
81 persist_output: true,
82 stdout_stream: None,
83 stderr_stream: None,
84 pid: None,
85 pgid: None,
86 stopped: false,
87 cancel: None,
88 pgids: Vec::new(),
89 }
90 }
91
92 pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
94 Self {
95 id,
96 session_id,
97 command,
98 handle: None,
99 result_rx: Some(rx),
100 result: None,
101 output_file: None,
102 persist_output: true,
103 stdout_stream: None,
104 stderr_stream: None,
105 pid: None,
106 pgid: None,
107 stopped: false,
108 cancel: None,
109 pgids: Vec::new(),
110 }
111 }
112
113 pub fn with_streams(
117 id: JobId,
118 session_id: u64,
119 command: String,
120 rx: oneshot::Receiver<ExecResult>,
121 stdout: Arc<BoundedStream>,
122 stderr: Arc<BoundedStream>,
123 ) -> Self {
124 Self {
125 id,
126 session_id,
127 command,
128 handle: None,
129 result_rx: Some(rx),
130 result: None,
131 output_file: None,
132 persist_output: true,
133 stdout_stream: Some(stdout),
134 stderr_stream: Some(stderr),
135 pid: None,
136 pgid: None,
137 stopped: false,
138 cancel: None,
139 pgids: Vec::new(),
140 }
141 }
142
143 pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
145 Self {
146 id,
147 session_id,
148 command,
149 handle: None,
150 result_rx: None,
151 result: None,
152 output_file: None,
153 persist_output: true,
154 stdout_stream: None,
155 stderr_stream: None,
156 pid: Some(pid),
157 pgid: Some(pgid),
158 stopped: true,
159 cancel: None,
160 pgids: Vec::new(),
161 }
162 }
163
164 pub fn output_file(&self) -> Option<&PathBuf> {
166 self.output_file.as_ref()
167 }
168
169 pub fn is_done(&mut self) -> bool {
173 if self.stopped {
174 return false;
175 }
176 self.try_poll();
177 self.result.is_some()
178 }
179
180 pub fn status(&mut self) -> JobStatus {
182 if self.stopped {
183 return JobStatus::Stopped;
184 }
185 self.try_poll();
186 match &self.result {
187 Some(r) if r.ok() => JobStatus::Done,
188 Some(_) => JobStatus::Failed,
189 None => JobStatus::Running,
190 }
191 }
192
193 pub fn status_string(&mut self) -> String {
200 self.try_poll();
201 match &self.result {
202 Some(r) if r.ok() => "done:0".to_string(),
203 Some(r) => format!("failed:{}", r.code),
204 None => "running".to_string(),
205 }
206 }
207
208 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
210 self.stdout_stream.as_ref()
211 }
212
213 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
215 self.stderr_stream.as_ref()
216 }
217
218 pub async fn wait(&mut self) -> ExecResult {
222 if let Some(result) = self.result.take() {
223 self.result = Some(result.clone());
224 return result;
225 }
226
227 let result = if let Some(handle) = self.handle.take() {
228 match handle.await {
229 Ok(r) => r,
230 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
231 }
232 } else if let Some(rx) = self.result_rx.take() {
233 match rx.await {
234 Ok(r) => r,
235 Err(_) => ExecResult::failure(1, "job channel closed"),
236 }
237 } else {
238 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
240 };
241
242 if self.persist_output
246 && self.output_file.is_none()
247 && let Some(path) = self.write_output_file(&result) {
248 self.output_file = Some(path);
249 }
250
251 self.result = Some(result.clone());
252 result
253 }
254
255 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
257 let is_bytes = result.is_bytes();
261 let text = if is_bytes {
262 std::borrow::Cow::Borrowed("")
263 } else {
264 result.text_out()
265 };
266 if !is_bytes && text.is_empty() && result.err.is_empty() {
267 return None;
268 }
269
270 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
271 if std::fs::create_dir_all(&tmp_dir).is_err() {
272 tracing::warn!("Failed to create job output directory");
273 return None;
274 }
275
276 let filename = format!(
285 "session_{}_job_{}.{}.txt",
286 self.session_id,
287 self.id.0,
288 std::process::id()
289 );
290 let path = tmp_dir.join(filename);
291
292 let mut content = String::new();
293 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
294 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
295
296 if is_bytes {
297 let n = result.out_bytes().map(|b| b.len()).unwrap_or(0);
298 content.push_str(&format!(
299 "## STDOUT\n[binary output: {n} bytes — omitted from this text log]\n"
300 ));
301 } else if !text.is_empty() {
302 content.push_str("## STDOUT\n");
303 content.push_str(&text);
304 if !text.ends_with('\n') {
305 content.push('\n');
306 }
307 }
308
309 if !result.err.is_empty() {
310 content.push_str("\n## STDERR\n");
311 content.push_str(&result.err);
312 if !result.err.ends_with('\n') {
313 content.push('\n');
314 }
315 }
316
317 match std::fs::write(&path, content) {
318 Ok(()) => Some(path),
319 Err(e) => {
320 tracing::warn!("Failed to write job output file: {}", e);
321 None
322 }
323 }
324 }
325
326 pub fn cleanup_files(&mut self) {
328 if let Some(path) = self.output_file.take() {
329 if let Err(e) = std::fs::remove_file(&path) {
330 if e.kind() != io::ErrorKind::NotFound {
332 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
333 }
334 }
335 }
336 }
337
338 pub fn try_result(&self) -> Option<&ExecResult> {
340 self.result.as_ref()
341 }
342
343 pub fn try_poll(&mut self) -> bool {
348 if self.result.is_some() {
349 return true;
350 }
351
352 if let Some(rx) = self.result_rx.as_mut() {
354 match rx.try_recv() {
355 Ok(result) => {
356 self.result = Some(result);
357 self.result_rx = None;
358 return true;
359 }
360 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
361 return false;
363 }
364 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
365 self.result = Some(ExecResult::failure(1, "job channel closed"));
367 self.result_rx = None;
368 return true;
369 }
370 }
371 }
372
373 if let Some(handle) = self.handle.as_mut()
375 && handle.is_finished() {
376 let Some(mut handle) = self.handle.take() else {
378 return false;
379 };
380 let waker = std::task::Waker::noop();
382 let mut cx = std::task::Context::from_waker(waker);
383 let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
384 std::task::Poll::Ready(Ok(r)) => r,
385 std::task::Poll::Ready(Err(e)) => {
386 ExecResult::failure(1, format!("job panicked: {}", e))
387 }
388 std::task::Poll::Pending => return false, };
390 self.result = Some(result);
391 return true;
392 }
393
394 false
395 }
396}
397
398static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
404
405fn prune_orphaned_job_files() {
417 #[cfg(target_os = "linux")]
419 {
420 let jobs_dir = std::env::temp_dir().join("kaish").join("jobs");
421 let Ok(entries) = std::fs::read_dir(&jobs_dir) else {
422 return; };
424 let current_pid = std::process::id();
425 for entry in entries.flatten() {
426 let name = entry.file_name();
427 let name_str = name.to_string_lossy();
428 let file_pid: Option<u32> = name_str
431 .strip_suffix(".txt")
432 .and_then(|s| s.rsplit_once('.'))
433 .and_then(|(_, pid_str)| pid_str.parse().ok());
434 let Some(pid) = file_pid else {
435 continue; };
437 if pid == current_pid {
438 continue; }
440 if std::path::Path::new(&format!("/proc/{}", pid)).exists() {
442 continue; }
444 let _ = std::fs::remove_file(entry.path());
446 }
447 }
448}
449
450pub struct JobManager {
452 session_id: u64,
454 next_id: AtomicU64,
456 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
458 persist_output_files: std::sync::atomic::AtomicBool,
464}
465
466impl JobManager {
467 pub fn new() -> Self {
482 static PRUNE_ONCE: std::sync::Once = std::sync::Once::new();
487 PRUNE_ONCE.call_once(prune_orphaned_job_files);
488 Self {
489 session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
490 next_id: AtomicU64::new(1),
491 jobs: Arc::new(Mutex::new(HashMap::new())),
492 persist_output_files: std::sync::atomic::AtomicBool::new(true),
493 }
494 }
495
496 pub fn set_persist_output_files(&self, on: bool) {
506 self.persist_output_files.store(on, Ordering::Relaxed);
507 }
508
509 pub fn persist_output_files(&self) -> bool {
511 self.persist_output_files.load(Ordering::Relaxed)
512 }
513
514 pub fn spawn<F>(&self, command: String, future: F) -> JobId
519 where
520 F: std::future::Future<Output = ExecResult> + Send + 'static,
521 {
522 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
523 let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
526 let mut job = Job::new(id, self.session_id, command, handle);
527 job.persist_output = self.persist_output_files();
528
529 let mut job_opt = Some(job);
533 loop {
534 match self.jobs.try_lock() {
535 Ok(mut guard) => {
536 if let Some(j) = job_opt.take() {
537 guard.insert(id, j);
538 }
539 break;
540 }
541 Err(_) => {
542 std::hint::spin_loop();
543 }
544 }
545 }
546
547 id
548 }
549
550 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
552 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
553 let mut job = Job::from_channel(id, self.session_id, command, rx);
554 job.persist_output = self.persist_output_files();
555
556 let mut jobs = self.jobs.lock().await;
557 jobs.insert(id, job);
558
559 id
560 }
561
562 pub async fn register_with_streams(
566 &self,
567 command: String,
568 rx: oneshot::Receiver<ExecResult>,
569 stdout: Arc<BoundedStream>,
570 stderr: Arc<BoundedStream>,
571 ) -> JobId {
572 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
573 let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
574 job.persist_output = self.persist_output_files();
575
576 let mut jobs = self.jobs.lock().await;
577 jobs.insert(id, job);
578
579 id
580 }
581
582 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
584 let mut jobs = self.jobs.lock().await;
585 if let Some(job) = jobs.get_mut(&id) {
586 Some(job.wait().await)
587 } else {
588 None
589 }
590 }
591
592 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
594 let mut results = Vec::new();
595
596 let ids: Vec<JobId> = {
598 let jobs = self.jobs.lock().await;
599 jobs.keys().copied().collect()
600 };
601
602 for id in ids {
603 if let Some(result) = self.wait(id).await {
604 results.push((id, result));
605 }
606 }
607
608 results
609 }
610
611 pub async fn list(&self) -> Vec<JobInfo> {
613 let mut jobs = self.jobs.lock().await;
614 jobs.values_mut()
615 .map(|job| JobInfo {
616 id: job.id,
617 command: job.command.clone(),
618 status: job.status(),
619 output_file: job.output_file.clone(),
620 pid: job.pid,
621 })
622 .collect()
623 }
624
625 pub async fn running_count(&self) -> usize {
627 let mut jobs = self.jobs.lock().await;
628 let mut count = 0;
629 for job in jobs.values_mut() {
630 if !job.is_done() {
631 count += 1;
632 }
633 }
634 count
635 }
636
637 pub async fn cleanup(&self) {
639 let mut jobs = self.jobs.lock().await;
640 jobs.retain(|_, job| {
641 if job.is_done() {
642 job.cleanup_files();
643 false
644 } else {
645 true
646 }
647 });
648 }
649
650 pub async fn exists(&self, id: JobId) -> bool {
652 let jobs = self.jobs.lock().await;
653 jobs.contains_key(&id)
654 }
655
656 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
658 let mut jobs = self.jobs.lock().await;
659 jobs.get_mut(&id).map(|job| JobInfo {
660 id: job.id,
661 command: job.command.clone(),
662 status: job.status(),
663 output_file: job.output_file.clone(),
664 pid: job.pid,
665 })
666 }
667
668 pub async fn get_command(&self, id: JobId) -> Option<String> {
670 let jobs = self.jobs.lock().await;
671 jobs.get(&id).map(|job| job.command.clone())
672 }
673
674 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
676 let mut jobs = self.jobs.lock().await;
677 jobs.get_mut(&id).map(|job| job.status_string())
678 }
679
680 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
684 let jobs = self.jobs.lock().await;
685 if let Some(job) = jobs.get(&id)
686 && let Some(stream) = job.stdout_stream() {
687 return Some(stream.read().await);
688 }
689 None
690 }
691
692 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
696 let jobs = self.jobs.lock().await;
697 if let Some(job) = jobs.get(&id)
698 && let Some(stream) = job.stderr_stream() {
699 return Some(stream.read().await);
700 }
701 None
702 }
703
704 pub async fn list_ids(&self) -> Vec<JobId> {
706 let jobs = self.jobs.lock().await;
707 jobs.keys().copied().collect()
708 }
709
710 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
712 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
713 let job = Job::stopped(id, self.session_id, command, pid, pgid);
714 let mut jobs = self.jobs.lock().await;
715 jobs.insert(id, job);
716 id
717 }
718
719 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
721 let mut jobs = self.jobs.lock().await;
722 if let Some(job) = jobs.get_mut(&id) {
723 job.stopped = true;
724 job.pid = Some(pid);
725 job.pgid = Some(pgid);
726 }
727 }
728
729 pub async fn resume_job(&self, id: JobId) {
731 let mut jobs = self.jobs.lock().await;
732 if let Some(job) = jobs.get_mut(&id) {
733 job.stopped = false;
734 }
735 }
736
737 pub async fn last_stopped(&self) -> Option<JobId> {
739 let mut jobs = self.jobs.lock().await;
740 let mut best: Option<JobId> = None;
742 for job in jobs.values_mut() {
743 if job.stopped {
744 match best {
745 None => best = Some(job.id),
746 Some(b) if job.id.0 > b.0 => best = Some(job.id),
747 _ => {}
748 }
749 }
750 }
751 best
752 }
753
754 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
756 let jobs = self.jobs.lock().await;
757 jobs.get(&id).and_then(|job| {
758 match (job.pid, job.pgid) {
759 (Some(pid), Some(pgid)) => Some((pid, pgid)),
760 _ => None,
761 }
762 })
763 }
764
765 pub async fn set_cancel_token(&self, id: JobId, token: tokio_util::sync::CancellationToken) {
769 let mut jobs = self.jobs.lock().await;
770 if let Some(job) = jobs.get_mut(&id) {
771 job.cancel = Some(token);
772 }
773 }
774
775 pub async fn cancel(&self, id: JobId) -> bool {
779 let jobs = self.jobs.lock().await;
780 match jobs.get(&id).and_then(|job| job.cancel.clone()) {
781 Some(token) => {
782 token.cancel();
783 true
784 }
785 None => false,
786 }
787 }
788
789 pub async fn add_pgid(&self, id: JobId, pgid: u32) {
793 let mut jobs = self.jobs.lock().await;
794 if let Some(job) = jobs.get_mut(&id) {
795 if !job.pgids.contains(&pgid) {
796 job.pgids.push(pgid);
797 }
798 }
799 }
800
801 pub async fn job_pgids(&self, id: JobId) -> Vec<u32> {
805 let jobs = self.jobs.lock().await;
806 jobs.get(&id)
807 .map(|job| {
808 let mut v = job.pgids.clone();
809 if let Some(pg) = job.pgid {
810 if !v.contains(&pg) {
811 v.push(pg);
812 }
813 }
814 v
815 })
816 .unwrap_or_default()
817 }
818
819 pub async fn remove(&self, id: JobId) {
821 let mut jobs = self.jobs.lock().await;
822 if let Some(mut job) = jobs.remove(&id) {
823 job.cleanup_files();
824 }
825 }
826}
827
828impl Default for JobManager {
829 fn default() -> Self {
830 Self::new()
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837 use std::time::Duration;
838
839 #[tokio::test]
840 async fn test_no_host_output_file_when_persistence_disabled() {
841 let manager = JobManager::new();
845 assert!(manager.persist_output_files(), "default is to persist");
846 manager.set_persist_output_files(false);
847 assert!(!manager.persist_output_files());
848
849 let id = manager.spawn("leaky".to_string(), async {
850 ExecResult::success("output that must not hit host disk")
851 });
852 tokio::time::sleep(Duration::from_millis(10)).await;
853 let result = manager.wait(id).await;
854 assert!(result.is_some());
855
856 let output_file = {
858 let jobs = manager.jobs.lock().await;
859 jobs.get(&id).and_then(|j| j.output_file().cloned())
860 };
861 assert!(
862 output_file.is_none(),
863 "no host output file should be written when persistence is disabled, got {output_file:?}"
864 );
865 }
866
867 #[tokio::test]
868 async fn test_spawn_and_wait() {
869 let manager = JobManager::new();
870
871 let id = manager.spawn("test".to_string(), async {
872 tokio::time::sleep(Duration::from_millis(10)).await;
873 ExecResult::success("done")
874 });
875
876 tokio::time::sleep(Duration::from_millis(5)).await;
878
879 let result = manager.wait(id).await;
880 assert!(result.is_some());
881 let result = result.unwrap();
882 assert!(result.ok());
883 assert_eq!(&*result.text_out(), "done");
884 }
885
886 #[tokio::test]
887 async fn test_wait_all() {
888 let manager = JobManager::new();
889
890 manager.spawn("job1".to_string(), async {
891 tokio::time::sleep(Duration::from_millis(10)).await;
892 ExecResult::success("one")
893 });
894
895 manager.spawn("job2".to_string(), async {
896 tokio::time::sleep(Duration::from_millis(5)).await;
897 ExecResult::success("two")
898 });
899
900 tokio::time::sleep(Duration::from_millis(5)).await;
902
903 let results = manager.wait_all().await;
904 assert_eq!(results.len(), 2);
905 }
906
907 #[tokio::test]
908 async fn test_list_jobs() {
909 let manager = JobManager::new();
910
911 manager.spawn("test job".to_string(), async {
912 tokio::time::sleep(Duration::from_millis(50)).await;
913 ExecResult::success("")
914 });
915
916 tokio::time::sleep(Duration::from_millis(5)).await;
918
919 let jobs = manager.list().await;
920 assert_eq!(jobs.len(), 1);
921 assert_eq!(jobs[0].command, "test job");
922 assert_eq!(jobs[0].status, JobStatus::Running);
923 }
924
925 #[tokio::test]
926 async fn test_job_status_after_completion() {
927 let manager = JobManager::new();
928
929 let id = manager.spawn("quick".to_string(), async {
930 ExecResult::success("")
931 });
932
933 tokio::time::sleep(Duration::from_millis(10)).await;
935 let _ = manager.wait(id).await;
936
937 let info = manager.get(id).await;
938 assert!(info.is_some());
939 assert_eq!(info.unwrap().status, JobStatus::Done);
940 }
941
942 #[tokio::test]
943 async fn test_cleanup() {
944 let manager = JobManager::new();
945
946 let id = manager.spawn("done".to_string(), async {
947 ExecResult::success("")
948 });
949
950 tokio::time::sleep(Duration::from_millis(10)).await;
952 let _ = manager.wait(id).await;
953
954 assert_eq!(manager.list().await.len(), 1);
956
957 manager.cleanup().await;
959
960 assert_eq!(manager.list().await.len(), 0);
962 }
963
964 #[tokio::test]
965 async fn test_cleanup_removes_temp_files() {
966 let manager = JobManager::new();
968
969 let id = manager.spawn("output job".to_string(), async {
970 ExecResult::success("some output that gets written to a temp file")
971 });
972
973 tokio::time::sleep(Duration::from_millis(10)).await;
975 let result = manager.wait(id).await;
976 assert!(result.is_some());
977
978 let output_file = {
982 let jobs = manager.jobs.lock().await;
983 jobs.get(&id).and_then(|j| j.output_file().cloned())
984 };
985 let path = output_file.expect("job with output should have written a temp file");
986 assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
987
988 manager.cleanup().await;
990
991 assert!(
992 !path.exists(),
993 "temp file should be removed after cleanup: {}",
994 path.display()
995 );
996 }
997
998 #[tokio::test]
999 async fn test_register_with_channel() {
1000 let manager = JobManager::new();
1001 let (tx, rx) = oneshot::channel();
1002
1003 let id = manager.register("channel job".to_string(), rx).await;
1004
1005 tx.send(ExecResult::success("from channel")).unwrap();
1007
1008 let result = manager.wait(id).await;
1009 assert!(result.is_some());
1010 assert_eq!(&*result.unwrap().text_out(), "from channel");
1011 }
1012
1013 #[tokio::test]
1014 async fn test_spawn_immediately_available() {
1015 let manager = JobManager::new();
1017
1018 let id = manager.spawn("instant".to_string(), async {
1019 tokio::time::sleep(Duration::from_millis(100)).await;
1020 ExecResult::success("done")
1021 });
1022
1023 let exists = manager.exists(id).await;
1025 assert!(exists, "job should be immediately available after spawn()");
1026
1027 let info = manager.get(id).await;
1028 assert!(info.is_some(), "job info should be available immediately");
1029 }
1030
1031 #[tokio::test]
1032 async fn test_nonexistent_job() {
1033 let manager = JobManager::new();
1034 let result = manager.wait(JobId(999)).await;
1035 assert!(result.is_none());
1036 }
1037
1038 #[tokio::test]
1039 async fn test_cancel_token_fires() {
1040 let manager = JobManager::new();
1043 let token = tokio_util::sync::CancellationToken::new();
1044 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1045 manager.set_cancel_token(id, token.clone()).await;
1046
1047 assert!(!token.is_cancelled());
1048 assert!(manager.cancel(id).await, "cancel should report success");
1049 assert!(token.is_cancelled(), "the job's token must be tripped");
1050 }
1051
1052 #[tokio::test]
1053 async fn test_cancel_without_token_returns_false() {
1054 let manager = JobManager::new();
1055 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1056 assert!(!manager.cancel(id).await);
1058 assert!(!manager.cancel(JobId(999)).await);
1060 }
1061
1062 #[tokio::test]
1063 async fn test_pgids_recorded_and_deduped() {
1064 let manager = JobManager::new();
1065 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1066 assert!(manager.job_pgids(id).await.is_empty());
1067
1068 manager.add_pgid(id, 4242).await;
1069 manager.add_pgid(id, 4243).await;
1070 manager.add_pgid(id, 4242).await; assert_eq!(manager.job_pgids(id).await, vec![4242, 4243]);
1072
1073 assert!(manager.job_pgids(JobId(999)).await.is_empty());
1075 }
1076}