1use std::collections::HashMap;
6use std::io;
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::future::Future;
10use std::sync::Arc;
11
12use tokio::sync::{oneshot, Mutex};
13use tokio::task::JoinHandle;
14
15use super::stream::BoundedStream;
16use crate::interpreter::ExecResult;
17
18pub use kaish_types::{JobId, JobInfo, JobStatus};
20
21pub struct Job {
23 pub id: JobId,
25 session_id: u64,
29 pub command: String,
31 handle: Option<JoinHandle<ExecResult>>,
33 result_rx: Option<oneshot::Receiver<ExecResult>>,
35 result: Option<ExecResult>,
37 output_file: Option<PathBuf>,
39 persist_output: bool,
47 stdout_stream: Option<Arc<BoundedStream>>,
49 stderr_stream: Option<Arc<BoundedStream>>,
51 pid: Option<u32>,
53 pgid: Option<u32>,
55 stopped: bool,
57 cancel: Option<tokio_util::sync::CancellationToken>,
63 pgids: Vec<u32>,
68}
69
70impl Job {
71 pub fn new(id: JobId, session_id: u64, command: String, handle: JoinHandle<ExecResult>) -> Self {
73 Self {
74 id,
75 session_id,
76 command,
77 handle: Some(handle),
78 result_rx: None,
79 result: None,
80 output_file: None,
81 persist_output: true,
82 stdout_stream: None,
83 stderr_stream: None,
84 pid: None,
85 pgid: None,
86 stopped: false,
87 cancel: None,
88 pgids: Vec::new(),
89 }
90 }
91
92 pub fn from_channel(id: JobId, session_id: u64, command: String, rx: oneshot::Receiver<ExecResult>) -> Self {
94 Self {
95 id,
96 session_id,
97 command,
98 handle: None,
99 result_rx: Some(rx),
100 result: None,
101 output_file: None,
102 persist_output: true,
103 stdout_stream: None,
104 stderr_stream: None,
105 pid: None,
106 pgid: None,
107 stopped: false,
108 cancel: None,
109 pgids: Vec::new(),
110 }
111 }
112
113 pub fn with_streams(
117 id: JobId,
118 session_id: u64,
119 command: String,
120 rx: oneshot::Receiver<ExecResult>,
121 stdout: Arc<BoundedStream>,
122 stderr: Arc<BoundedStream>,
123 ) -> Self {
124 Self {
125 id,
126 session_id,
127 command,
128 handle: None,
129 result_rx: Some(rx),
130 result: None,
131 output_file: None,
132 persist_output: true,
133 stdout_stream: Some(stdout),
134 stderr_stream: Some(stderr),
135 pid: None,
136 pgid: None,
137 stopped: false,
138 cancel: None,
139 pgids: Vec::new(),
140 }
141 }
142
143 pub fn stopped(id: JobId, session_id: u64, command: String, pid: u32, pgid: u32) -> Self {
145 Self {
146 id,
147 session_id,
148 command,
149 handle: None,
150 result_rx: None,
151 result: None,
152 output_file: None,
153 persist_output: true,
154 stdout_stream: None,
155 stderr_stream: None,
156 pid: Some(pid),
157 pgid: Some(pgid),
158 stopped: true,
159 cancel: None,
160 pgids: Vec::new(),
161 }
162 }
163
164 pub fn output_file(&self) -> Option<&PathBuf> {
166 self.output_file.as_ref()
167 }
168
169 pub fn is_done(&mut self) -> bool {
173 if self.stopped {
174 return false;
175 }
176 self.try_poll();
177 self.result.is_some()
178 }
179
180 pub fn status(&mut self) -> JobStatus {
182 if self.stopped {
183 return JobStatus::Stopped;
184 }
185 self.try_poll();
186 match &self.result {
187 Some(r) if r.ok() => JobStatus::Done,
188 Some(_) => JobStatus::Failed,
189 None => JobStatus::Running,
190 }
191 }
192
193 pub fn status_string(&mut self) -> String {
200 self.try_poll();
201 match &self.result {
202 Some(r) if r.ok() => "done:0".to_string(),
203 Some(r) => format!("failed:{}", r.code),
204 None => "running".to_string(),
205 }
206 }
207
208 pub fn stdout_stream(&self) -> Option<&Arc<BoundedStream>> {
210 self.stdout_stream.as_ref()
211 }
212
213 pub fn stderr_stream(&self) -> Option<&Arc<BoundedStream>> {
215 self.stderr_stream.as_ref()
216 }
217
218 pub async fn wait(&mut self) -> ExecResult {
222 if let Some(result) = self.result.take() {
223 self.result = Some(result.clone());
224 return result;
225 }
226
227 let result = if let Some(handle) = self.handle.take() {
228 match handle.await {
229 Ok(r) => r,
230 Err(e) => ExecResult::failure(1, format!("job panicked: {}", e)),
231 }
232 } else if let Some(rx) = self.result_rx.take() {
233 match rx.await {
234 Ok(r) => r,
235 Err(_) => ExecResult::failure(1, "job channel closed"),
236 }
237 } else {
238 self.result.clone().unwrap_or_else(|| ExecResult::failure(1, "no result"))
240 };
241
242 if self.persist_output
246 && self.output_file.is_none()
247 && let Some(path) = self.write_output_file(&result) {
248 self.output_file = Some(path);
249 }
250
251 self.result = Some(result.clone());
252 result
253 }
254
255 fn write_output_file(&self, result: &ExecResult) -> Option<PathBuf> {
257 let is_bytes = result.is_bytes();
261 let text = if is_bytes {
262 std::borrow::Cow::Borrowed("")
263 } else {
264 result.text_out()
265 };
266 if !is_bytes && text.is_empty() && result.err.is_empty() {
267 return None;
268 }
269
270 let tmp_dir = std::env::temp_dir().join("kaish").join("jobs");
271 if std::fs::create_dir_all(&tmp_dir).is_err() {
272 tracing::warn!("Failed to create job output directory");
273 return None;
274 }
275
276 let filename = format!(
285 "session_{}_job_{}.{}.txt",
286 self.session_id,
287 self.id.0,
288 std::process::id()
289 );
290 let path = tmp_dir.join(filename);
291
292 let mut content = String::new();
293 content.push_str(&format!("# Job {}: {}\n", self.id, self.command));
294 content.push_str(&format!("# Status: {}\n\n", if result.ok() { "Done" } else { "Failed" }));
295
296 if is_bytes {
297 let n = result.out_bytes().map(|b| b.len()).unwrap_or(0);
298 content.push_str(&format!(
299 "## STDOUT\n[binary output: {n} bytes — omitted from this text log]\n"
300 ));
301 } else if !text.is_empty() {
302 content.push_str("## STDOUT\n");
303 content.push_str(&text);
304 if !text.ends_with('\n') {
305 content.push('\n');
306 }
307 }
308
309 if !result.err.is_empty() {
310 content.push_str("\n## STDERR\n");
311 content.push_str(&result.err);
312 if !result.err.ends_with('\n') {
313 content.push('\n');
314 }
315 }
316
317 match std::fs::write(&path, content) {
318 Ok(()) => Some(path),
319 Err(e) => {
320 tracing::warn!("Failed to write job output file: {}", e);
321 None
322 }
323 }
324 }
325
326 pub fn cleanup_files(&mut self) {
328 if let Some(path) = self.output_file.take() {
329 if let Err(e) = std::fs::remove_file(&path) {
330 if e.kind() != io::ErrorKind::NotFound {
332 tracing::warn!("Failed to clean up job output file {}: {}", path.display(), e);
333 }
334 }
335 }
336 }
337
338 pub fn try_result(&self) -> Option<&ExecResult> {
340 self.result.as_ref()
341 }
342
343 pub fn try_poll(&mut self) -> bool {
348 if self.result.is_some() {
349 return true;
350 }
351
352 if let Some(rx) = self.result_rx.as_mut() {
354 match rx.try_recv() {
355 Ok(result) => {
356 self.result = Some(result);
357 self.result_rx = None;
358 return true;
359 }
360 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
361 return false;
363 }
364 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
365 self.result = Some(ExecResult::failure(1, "job channel closed"));
367 self.result_rx = None;
368 return true;
369 }
370 }
371 }
372
373 if let Some(handle) = self.handle.as_mut()
375 && handle.is_finished() {
376 let Some(mut handle) = self.handle.take() else {
378 return false;
379 };
380 let waker = std::task::Waker::noop();
382 let mut cx = std::task::Context::from_waker(waker);
383 let result = match std::pin::Pin::new(&mut handle).poll(&mut cx) {
384 std::task::Poll::Ready(Ok(r)) => r,
385 std::task::Poll::Ready(Err(e)) => {
386 ExecResult::failure(1, format!("job panicked: {}", e))
387 }
388 std::task::Poll::Pending => return false, };
390 self.result = Some(result);
391 return true;
392 }
393
394 false
395 }
396}
397
398static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(0);
404
405pub struct JobManager {
407 session_id: u64,
409 next_id: AtomicU64,
411 jobs: Arc<Mutex<HashMap<JobId, Job>>>,
413 persist_output_files: std::sync::atomic::AtomicBool,
419}
420
421impl JobManager {
422 pub fn new() -> Self {
424 Self {
425 session_id: NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst),
426 next_id: AtomicU64::new(1),
427 jobs: Arc::new(Mutex::new(HashMap::new())),
428 persist_output_files: std::sync::atomic::AtomicBool::new(true),
429 }
430 }
431
432 pub fn set_persist_output_files(&self, on: bool) {
442 self.persist_output_files.store(on, Ordering::Relaxed);
443 }
444
445 pub fn persist_output_files(&self) -> bool {
447 self.persist_output_files.load(Ordering::Relaxed)
448 }
449
450 pub fn spawn<F>(&self, command: String, future: F) -> JobId
455 where
456 F: std::future::Future<Output = ExecResult> + Send + 'static,
457 {
458 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
459 let handle = tokio::spawn(crate::telemetry::bind_current_context(future));
462 let mut job = Job::new(id, self.session_id, command, handle);
463 job.persist_output = self.persist_output_files();
464
465 let mut job_opt = Some(job);
469 loop {
470 match self.jobs.try_lock() {
471 Ok(mut guard) => {
472 if let Some(j) = job_opt.take() {
473 guard.insert(id, j);
474 }
475 break;
476 }
477 Err(_) => {
478 std::hint::spin_loop();
479 }
480 }
481 }
482
483 id
484 }
485
486 pub async fn register(&self, command: String, rx: oneshot::Receiver<ExecResult>) -> JobId {
488 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
489 let mut job = Job::from_channel(id, self.session_id, command, rx);
490 job.persist_output = self.persist_output_files();
491
492 let mut jobs = self.jobs.lock().await;
493 jobs.insert(id, job);
494
495 id
496 }
497
498 pub async fn register_with_streams(
502 &self,
503 command: String,
504 rx: oneshot::Receiver<ExecResult>,
505 stdout: Arc<BoundedStream>,
506 stderr: Arc<BoundedStream>,
507 ) -> JobId {
508 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
509 let mut job = Job::with_streams(id, self.session_id, command, rx, stdout, stderr);
510 job.persist_output = self.persist_output_files();
511
512 let mut jobs = self.jobs.lock().await;
513 jobs.insert(id, job);
514
515 id
516 }
517
518 pub async fn wait(&self, id: JobId) -> Option<ExecResult> {
520 let mut jobs = self.jobs.lock().await;
521 if let Some(job) = jobs.get_mut(&id) {
522 Some(job.wait().await)
523 } else {
524 None
525 }
526 }
527
528 pub async fn wait_all(&self) -> Vec<(JobId, ExecResult)> {
530 let mut results = Vec::new();
531
532 let ids: Vec<JobId> = {
534 let jobs = self.jobs.lock().await;
535 jobs.keys().copied().collect()
536 };
537
538 for id in ids {
539 if let Some(result) = self.wait(id).await {
540 results.push((id, result));
541 }
542 }
543
544 results
545 }
546
547 pub async fn list(&self) -> Vec<JobInfo> {
549 let mut jobs = self.jobs.lock().await;
550 jobs.values_mut()
551 .map(|job| JobInfo {
552 id: job.id,
553 command: job.command.clone(),
554 status: job.status(),
555 output_file: job.output_file.clone(),
556 pid: job.pid,
557 })
558 .collect()
559 }
560
561 pub async fn running_count(&self) -> usize {
563 let mut jobs = self.jobs.lock().await;
564 let mut count = 0;
565 for job in jobs.values_mut() {
566 if !job.is_done() {
567 count += 1;
568 }
569 }
570 count
571 }
572
573 pub async fn cleanup(&self) {
575 let mut jobs = self.jobs.lock().await;
576 jobs.retain(|_, job| {
577 if job.is_done() {
578 job.cleanup_files();
579 false
580 } else {
581 true
582 }
583 });
584 }
585
586 pub async fn exists(&self, id: JobId) -> bool {
588 let jobs = self.jobs.lock().await;
589 jobs.contains_key(&id)
590 }
591
592 pub async fn get(&self, id: JobId) -> Option<JobInfo> {
594 let mut jobs = self.jobs.lock().await;
595 jobs.get_mut(&id).map(|job| JobInfo {
596 id: job.id,
597 command: job.command.clone(),
598 status: job.status(),
599 output_file: job.output_file.clone(),
600 pid: job.pid,
601 })
602 }
603
604 pub async fn get_command(&self, id: JobId) -> Option<String> {
606 let jobs = self.jobs.lock().await;
607 jobs.get(&id).map(|job| job.command.clone())
608 }
609
610 pub async fn get_status_string(&self, id: JobId) -> Option<String> {
612 let mut jobs = self.jobs.lock().await;
613 jobs.get_mut(&id).map(|job| job.status_string())
614 }
615
616 pub async fn read_stdout(&self, id: JobId) -> Option<Vec<u8>> {
620 let jobs = self.jobs.lock().await;
621 if let Some(job) = jobs.get(&id)
622 && let Some(stream) = job.stdout_stream() {
623 return Some(stream.read().await);
624 }
625 None
626 }
627
628 pub async fn read_stderr(&self, id: JobId) -> Option<Vec<u8>> {
632 let jobs = self.jobs.lock().await;
633 if let Some(job) = jobs.get(&id)
634 && let Some(stream) = job.stderr_stream() {
635 return Some(stream.read().await);
636 }
637 None
638 }
639
640 pub async fn list_ids(&self) -> Vec<JobId> {
642 let jobs = self.jobs.lock().await;
643 jobs.keys().copied().collect()
644 }
645
646 pub async fn register_stopped(&self, command: String, pid: u32, pgid: u32) -> JobId {
648 let id = JobId(self.next_id.fetch_add(1, Ordering::SeqCst));
649 let job = Job::stopped(id, self.session_id, command, pid, pgid);
650 let mut jobs = self.jobs.lock().await;
651 jobs.insert(id, job);
652 id
653 }
654
655 pub async fn stop_job(&self, id: JobId, pid: u32, pgid: u32) {
657 let mut jobs = self.jobs.lock().await;
658 if let Some(job) = jobs.get_mut(&id) {
659 job.stopped = true;
660 job.pid = Some(pid);
661 job.pgid = Some(pgid);
662 }
663 }
664
665 pub async fn resume_job(&self, id: JobId) {
667 let mut jobs = self.jobs.lock().await;
668 if let Some(job) = jobs.get_mut(&id) {
669 job.stopped = false;
670 }
671 }
672
673 pub async fn last_stopped(&self) -> Option<JobId> {
675 let mut jobs = self.jobs.lock().await;
676 let mut best: Option<JobId> = None;
678 for job in jobs.values_mut() {
679 if job.stopped {
680 match best {
681 None => best = Some(job.id),
682 Some(b) if job.id.0 > b.0 => best = Some(job.id),
683 _ => {}
684 }
685 }
686 }
687 best
688 }
689
690 pub async fn get_process_info(&self, id: JobId) -> Option<(u32, u32)> {
692 let jobs = self.jobs.lock().await;
693 jobs.get(&id).and_then(|job| {
694 match (job.pid, job.pgid) {
695 (Some(pid), Some(pgid)) => Some((pid, pgid)),
696 _ => None,
697 }
698 })
699 }
700
701 pub async fn set_cancel_token(&self, id: JobId, token: tokio_util::sync::CancellationToken) {
705 let mut jobs = self.jobs.lock().await;
706 if let Some(job) = jobs.get_mut(&id) {
707 job.cancel = Some(token);
708 }
709 }
710
711 pub async fn cancel(&self, id: JobId) -> bool {
715 let jobs = self.jobs.lock().await;
716 match jobs.get(&id).and_then(|job| job.cancel.clone()) {
717 Some(token) => {
718 token.cancel();
719 true
720 }
721 None => false,
722 }
723 }
724
725 pub async fn add_pgid(&self, id: JobId, pgid: u32) {
729 let mut jobs = self.jobs.lock().await;
730 if let Some(job) = jobs.get_mut(&id) {
731 if !job.pgids.contains(&pgid) {
732 job.pgids.push(pgid);
733 }
734 }
735 }
736
737 pub async fn job_pgids(&self, id: JobId) -> Vec<u32> {
741 let jobs = self.jobs.lock().await;
742 jobs.get(&id)
743 .map(|job| {
744 let mut v = job.pgids.clone();
745 if let Some(pg) = job.pgid {
746 if !v.contains(&pg) {
747 v.push(pg);
748 }
749 }
750 v
751 })
752 .unwrap_or_default()
753 }
754
755 pub async fn remove(&self, id: JobId) {
757 let mut jobs = self.jobs.lock().await;
758 if let Some(mut job) = jobs.remove(&id) {
759 job.cleanup_files();
760 }
761 }
762}
763
764impl Default for JobManager {
765 fn default() -> Self {
766 Self::new()
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use std::time::Duration;
774
775 #[tokio::test]
776 async fn test_no_host_output_file_when_persistence_disabled() {
777 let manager = JobManager::new();
781 assert!(manager.persist_output_files(), "default is to persist");
782 manager.set_persist_output_files(false);
783 assert!(!manager.persist_output_files());
784
785 let id = manager.spawn("leaky".to_string(), async {
786 ExecResult::success("output that must not hit host disk")
787 });
788 tokio::time::sleep(Duration::from_millis(10)).await;
789 let result = manager.wait(id).await;
790 assert!(result.is_some());
791
792 let output_file = {
794 let jobs = manager.jobs.lock().await;
795 jobs.get(&id).and_then(|j| j.output_file().cloned())
796 };
797 assert!(
798 output_file.is_none(),
799 "no host output file should be written when persistence is disabled, got {output_file:?}"
800 );
801 }
802
803 #[tokio::test]
804 async fn test_spawn_and_wait() {
805 let manager = JobManager::new();
806
807 let id = manager.spawn("test".to_string(), async {
808 tokio::time::sleep(Duration::from_millis(10)).await;
809 ExecResult::success("done")
810 });
811
812 tokio::time::sleep(Duration::from_millis(5)).await;
814
815 let result = manager.wait(id).await;
816 assert!(result.is_some());
817 let result = result.unwrap();
818 assert!(result.ok());
819 assert_eq!(&*result.text_out(), "done");
820 }
821
822 #[tokio::test]
823 async fn test_wait_all() {
824 let manager = JobManager::new();
825
826 manager.spawn("job1".to_string(), async {
827 tokio::time::sleep(Duration::from_millis(10)).await;
828 ExecResult::success("one")
829 });
830
831 manager.spawn("job2".to_string(), async {
832 tokio::time::sleep(Duration::from_millis(5)).await;
833 ExecResult::success("two")
834 });
835
836 tokio::time::sleep(Duration::from_millis(5)).await;
838
839 let results = manager.wait_all().await;
840 assert_eq!(results.len(), 2);
841 }
842
843 #[tokio::test]
844 async fn test_list_jobs() {
845 let manager = JobManager::new();
846
847 manager.spawn("test job".to_string(), async {
848 tokio::time::sleep(Duration::from_millis(50)).await;
849 ExecResult::success("")
850 });
851
852 tokio::time::sleep(Duration::from_millis(5)).await;
854
855 let jobs = manager.list().await;
856 assert_eq!(jobs.len(), 1);
857 assert_eq!(jobs[0].command, "test job");
858 assert_eq!(jobs[0].status, JobStatus::Running);
859 }
860
861 #[tokio::test]
862 async fn test_job_status_after_completion() {
863 let manager = JobManager::new();
864
865 let id = manager.spawn("quick".to_string(), async {
866 ExecResult::success("")
867 });
868
869 tokio::time::sleep(Duration::from_millis(10)).await;
871 let _ = manager.wait(id).await;
872
873 let info = manager.get(id).await;
874 assert!(info.is_some());
875 assert_eq!(info.unwrap().status, JobStatus::Done);
876 }
877
878 #[tokio::test]
879 async fn test_cleanup() {
880 let manager = JobManager::new();
881
882 let id = manager.spawn("done".to_string(), async {
883 ExecResult::success("")
884 });
885
886 tokio::time::sleep(Duration::from_millis(10)).await;
888 let _ = manager.wait(id).await;
889
890 assert_eq!(manager.list().await.len(), 1);
892
893 manager.cleanup().await;
895
896 assert_eq!(manager.list().await.len(), 0);
898 }
899
900 #[tokio::test]
901 async fn test_cleanup_removes_temp_files() {
902 let manager = JobManager::new();
904
905 let id = manager.spawn("output job".to_string(), async {
906 ExecResult::success("some output that gets written to a temp file")
907 });
908
909 tokio::time::sleep(Duration::from_millis(10)).await;
911 let result = manager.wait(id).await;
912 assert!(result.is_some());
913
914 let output_file = {
918 let jobs = manager.jobs.lock().await;
919 jobs.get(&id).and_then(|j| j.output_file().cloned())
920 };
921 let path = output_file.expect("job with output should have written a temp file");
922 assert!(path.exists(), "temp file should exist before cleanup: {}", path.display());
923
924 manager.cleanup().await;
926
927 assert!(
928 !path.exists(),
929 "temp file should be removed after cleanup: {}",
930 path.display()
931 );
932 }
933
934 #[tokio::test]
935 async fn test_register_with_channel() {
936 let manager = JobManager::new();
937 let (tx, rx) = oneshot::channel();
938
939 let id = manager.register("channel job".to_string(), rx).await;
940
941 tx.send(ExecResult::success("from channel")).unwrap();
943
944 let result = manager.wait(id).await;
945 assert!(result.is_some());
946 assert_eq!(&*result.unwrap().text_out(), "from channel");
947 }
948
949 #[tokio::test]
950 async fn test_spawn_immediately_available() {
951 let manager = JobManager::new();
953
954 let id = manager.spawn("instant".to_string(), async {
955 tokio::time::sleep(Duration::from_millis(100)).await;
956 ExecResult::success("done")
957 });
958
959 let exists = manager.exists(id).await;
961 assert!(exists, "job should be immediately available after spawn()");
962
963 let info = manager.get(id).await;
964 assert!(info.is_some(), "job info should be available immediately");
965 }
966
967 #[tokio::test]
968 async fn test_nonexistent_job() {
969 let manager = JobManager::new();
970 let result = manager.wait(JobId(999)).await;
971 assert!(result.is_none());
972 }
973
974 #[tokio::test]
975 async fn test_cancel_token_fires() {
976 let manager = JobManager::new();
979 let token = tokio_util::sync::CancellationToken::new();
980 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
981 manager.set_cancel_token(id, token.clone()).await;
982
983 assert!(!token.is_cancelled());
984 assert!(manager.cancel(id).await, "cancel should report success");
985 assert!(token.is_cancelled(), "the job's token must be tripped");
986 }
987
988 #[tokio::test]
989 async fn test_cancel_without_token_returns_false() {
990 let manager = JobManager::new();
991 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
992 assert!(!manager.cancel(id).await);
994 assert!(!manager.cancel(JobId(999)).await);
996 }
997
998 #[tokio::test]
999 async fn test_pgids_recorded_and_deduped() {
1000 let manager = JobManager::new();
1001 let id = manager.spawn("bg".to_string(), async { ExecResult::success("") });
1002 assert!(manager.job_pgids(id).await.is_empty());
1003
1004 manager.add_pgid(id, 4242).await;
1005 manager.add_pgid(id, 4243).await;
1006 manager.add_pgid(id, 4242).await; assert_eq!(manager.job_pgids(id).await, vec![4242, 4243]);
1008
1009 assert!(manager.job_pgids(JobId(999)).await.is_empty());
1011 }
1012}