1use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18 Snapshot,
19};
20
21#[derive(Debug)]
23pub struct RunOpts<'a> {
24 pub command: Vec<String>,
26 pub root: Option<&'a str>,
28 pub snapshot_after: u64,
30 pub tail_lines: u64,
32 pub max_bytes: u64,
34 pub timeout_ms: u64,
36 pub kill_after_ms: u64,
38 pub cwd: Option<&'a str>,
40 pub env_vars: Vec<String>,
42 pub env_files: Vec<String>,
44 pub inherit_env: bool,
46 pub mask: Vec<String>,
48 pub log: Option<&'a str>,
50 pub progress_every_ms: u64,
52 pub wait: bool,
55 pub wait_poll_ms: u64,
57 pub notify_command: Option<String>,
60 pub notify_file: Option<String>,
62 pub shell_wrapper: Vec<String>,
65}
66
67impl<'a> Default for RunOpts<'a> {
68 fn default() -> Self {
69 RunOpts {
70 command: vec![],
71 root: None,
72 snapshot_after: 10_000,
73 tail_lines: 50,
74 max_bytes: 65536,
75 timeout_ms: 0,
76 kill_after_ms: 0,
77 cwd: None,
78 env_vars: vec![],
79 env_files: vec![],
80 inherit_env: true,
81 mask: vec![],
82 log: None,
83 progress_every_ms: 0,
84 wait: false,
85 wait_poll_ms: 200,
86 notify_command: None,
87 notify_file: None,
88 shell_wrapper: crate::config::default_shell_wrapper(),
89 }
90 }
91}
92
93const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
95
96pub fn execute(opts: RunOpts) -> Result<()> {
98 if opts.command.is_empty() {
99 anyhow::bail!("no command specified for run");
100 }
101
102 let elapsed_start = std::time::Instant::now();
103
104 let root = resolve_root(opts.root);
105 std::fs::create_dir_all(&root)
106 .with_context(|| format!("create jobs root {}", root.display()))?;
107
108 let job_id = Ulid::new().to_string();
109 let created_at = now_rfc3339();
110
111 let env_keys: Vec<String> = opts
113 .env_vars
114 .iter()
115 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
116 .collect();
117
118 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
120
121 let effective_cwd = resolve_effective_cwd(opts.cwd);
125
126 let notification = if opts.notify_command.is_some() || opts.notify_file.is_some() {
127 Some(crate::schema::NotificationConfig {
128 notify_command: opts.notify_command.clone(),
129 notify_file: opts.notify_file.clone(),
130 })
131 } else {
132 None
133 };
134
135 let meta = JobMeta {
136 job: JobMetaJob { id: job_id.clone() },
137 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
138 command: opts.command.clone(),
139 created_at: created_at.clone(),
140 root: root.display().to_string(),
141 env_keys,
142 env_vars: masked_env_vars.clone(),
143 mask: opts.mask.clone(),
144 cwd: Some(effective_cwd),
145 notification,
146 };
147
148 let job_dir = JobDir::create(&root, &job_id, &meta)?;
149 info!(job_id = %job_id, "created job directory");
150
151 let full_log_path = if let Some(log) = opts.log {
153 log.to_string()
154 } else {
155 job_dir.full_log_path().display().to_string()
156 };
157
158 for log_path in [
163 job_dir.stdout_path(),
164 job_dir.stderr_path(),
165 job_dir.full_log_path(),
166 ] {
167 std::fs::OpenOptions::new()
168 .create(true)
169 .append(true)
170 .open(&log_path)
171 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
172 }
173
174 let exe = std::env::current_exe().context("resolve current exe")?;
176 let mut supervisor_cmd = Command::new(&exe);
177 supervisor_cmd
178 .arg("_supervise")
179 .arg("--job-id")
180 .arg(&job_id)
181 .arg("--root")
182 .arg(root.display().to_string())
183 .arg("--full-log")
184 .arg(&full_log_path);
185
186 if opts.timeout_ms > 0 {
187 supervisor_cmd
188 .arg("--timeout")
189 .arg(opts.timeout_ms.to_string());
190 }
191 if opts.kill_after_ms > 0 {
192 supervisor_cmd
193 .arg("--kill-after")
194 .arg(opts.kill_after_ms.to_string());
195 }
196 if let Some(cwd) = opts.cwd {
197 supervisor_cmd.arg("--cwd").arg(cwd);
198 }
199 for env_file in &opts.env_files {
200 supervisor_cmd.arg("--env-file").arg(env_file);
201 }
202 for env_var in &opts.env_vars {
203 supervisor_cmd.arg("--env").arg(env_var);
204 }
205 if !opts.inherit_env {
206 supervisor_cmd.arg("--no-inherit-env");
207 }
208 if opts.progress_every_ms > 0 {
211 supervisor_cmd
212 .arg("--progress-every")
213 .arg(opts.progress_every_ms.to_string());
214 }
215 if let Some(ref nc) = opts.notify_command {
216 supervisor_cmd.arg("--notify-command").arg(nc);
217 }
218 if let Some(ref nf) = opts.notify_file {
219 supervisor_cmd.arg("--notify-file").arg(nf);
220 }
221 let wrapper_json = serde_json::to_string(&opts.shell_wrapper)
224 .context("serialize shell wrapper")?;
225 supervisor_cmd
226 .arg("--shell-wrapper-resolved")
227 .arg(&wrapper_json);
228
229 supervisor_cmd
230 .arg("--")
231 .args(&opts.command)
232 .stdin(std::process::Stdio::null())
233 .stdout(std::process::Stdio::null())
234 .stderr(std::process::Stdio::null());
235
236 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
237
238 let supervisor_pid = supervisor.id();
239 debug!(supervisor_pid, "supervisor spawned");
240
241 job_dir.init_state(supervisor_pid, &created_at)?;
245
246 #[cfg(windows)]
251 {
252 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
253 loop {
254 std::thread::sleep(std::time::Duration::from_millis(10));
255 if let Ok(current_state) = job_dir.read_state() {
256 let supervisor_updated = current_state
260 .pid
261 .map(|p| p != supervisor_pid)
262 .unwrap_or(false)
263 || *current_state.status() == JobStatus::Failed;
264 if supervisor_updated {
265 if *current_state.status() == JobStatus::Failed {
266 anyhow::bail!(
270 "supervisor failed to assign child process to Job Object \
271 (Windows MUST requirement); see stderr for details"
272 );
273 }
274 debug!("supervisor confirmed Job Object assignment via state.json handshake");
275 break;
276 }
277 }
278 if std::time::Instant::now() >= handshake_deadline {
279 debug!("supervisor handshake timed out; proceeding with initial state");
282 break;
283 }
284 }
285 }
286
287 let stdout_log_path = job_dir.stdout_path().display().to_string();
289 let stderr_log_path = job_dir.stderr_path().display().to_string();
290
291 let effective_snapshot_after = if opts.wait {
295 0
298 } else {
299 opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
300 };
301
302 let wait_start = std::time::Instant::now();
305
306 let snapshot = if effective_snapshot_after > 0 {
310 debug!(ms = effective_snapshot_after, "polling for snapshot");
311 let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
312 let poll_interval = std::time::Duration::from_millis(15);
314 loop {
315 std::thread::sleep(poll_interval);
316 if let Ok(st) = job_dir.read_state()
320 && *st.status() != JobStatus::Running
321 {
322 debug!("snapshot poll: job no longer running, exiting early");
323 break;
324 }
325 if std::time::Instant::now() >= deadline {
327 debug!("snapshot poll: deadline reached");
328 break;
329 }
330 }
331 let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
332 Some(snap)
333 } else {
334 None
335 };
336
337 let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
341 debug!("--wait: polling for terminal state");
342 let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
343 loop {
344 std::thread::sleep(poll);
345 if let Ok(st) = job_dir.read_state()
346 && *st.status() != JobStatus::Running
347 {
348 let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
349 let ec = st.exit_code();
350 let fa = st.finished_at.clone();
351 let state_str = st.status().as_str().to_string();
352 break (state_str, ec, fa, Some(snap));
353 }
354 }
355 } else {
356 (JobStatus::Running.as_str().to_string(), None, None, None)
357 };
358
359 let waited_ms = wait_start.elapsed().as_millis() as u64;
361
362 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
363
364 let response = Response::new(
365 "run",
366 RunData {
367 job_id,
368 state: final_state,
369 env_vars: masked_env_vars,
372 snapshot,
373 stdout_log_path,
374 stderr_log_path,
375 waited_ms,
376 elapsed_ms,
377 exit_code: exit_code_opt,
378 finished_at: finished_at_opt,
379 final_snapshot: final_snapshot_opt,
380 },
381 );
382 response.print();
383 Ok(())
384}
385
386fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
387 let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
388 let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
389 Snapshot {
390 truncated: stdout.truncated || stderr.truncated,
391 encoding: "utf-8-lossy".to_string(),
392 stdout_observed_bytes: stdout.observed_bytes,
393 stderr_observed_bytes: stderr.observed_bytes,
394 stdout_included_bytes: stdout.included_bytes,
395 stderr_included_bytes: stderr.included_bytes,
396 stdout_tail: stdout.tail,
397 stderr_tail: stderr.tail,
398 }
399}
400
401#[derive(Debug)]
407pub struct SuperviseOpts<'a> {
408 pub job_id: &'a str,
409 pub root: &'a Path,
410 pub command: &'a [String],
411 pub full_log: Option<&'a str>,
413 pub timeout_ms: u64,
415 pub kill_after_ms: u64,
417 pub cwd: Option<&'a str>,
419 pub env_vars: Vec<String>,
421 pub env_files: Vec<String>,
423 pub inherit_env: bool,
425 pub progress_every_ms: u64,
427 pub notify_command: Option<String>,
430 pub notify_file: Option<String>,
432 pub shell_wrapper: Vec<String>,
434}
435
436pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
443 let base = match cwd_override {
444 Some(p) => std::path::PathBuf::from(p),
445 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
446 };
447
448 match base.canonicalize() {
450 Ok(canonical) => canonical.display().to_string(),
451 Err(_) => {
452 if base.is_absolute() {
454 base.display().to_string()
455 } else {
456 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
458 cwd.join(base).display().to_string()
459 }
460 }
461 }
462}
463
464fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
467 if mask_keys.is_empty() {
468 return env_vars.to_vec();
469 }
470 env_vars
471 .iter()
472 .map(|s| {
473 let (key, _val) = parse_env_var(s);
474 if mask_keys.iter().any(|k| k == &key) {
475 format!("{key}=***")
476 } else {
477 s.clone()
478 }
479 })
480 .collect()
481}
482
483fn parse_env_var(s: &str) -> (String, String) {
485 if let Some(pos) = s.find('=') {
486 (s[..pos].to_string(), s[pos + 1..].to_string())
487 } else {
488 (s.to_string(), String::new())
489 }
490}
491
492fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
495 let contents =
496 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
497 let mut vars = Vec::new();
498 for line in contents.lines() {
499 let line = line.trim();
500 if line.is_empty() || line.starts_with('#') {
501 continue;
502 }
503 vars.push(parse_env_var(line));
504 }
505 Ok(vars)
506}
507
508fn stream_to_logs<R>(
522 stream: R,
523 log_path: &std::path::Path,
524 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
525 label: &str,
526) where
527 R: std::io::Read,
528{
529 use std::io::Write;
530 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
531 let mut stream = stream;
532 let mut buf = [0u8; 8192];
533 let mut line_buf: Vec<u8> = Vec::new();
535 loop {
536 match stream.read(&mut buf) {
537 Ok(0) => break, Ok(n) => {
539 let chunk = &buf[..n];
540 let _ = log_file.write_all(chunk);
542 for &b in chunk {
544 if b == b'\n' {
545 let line = String::from_utf8_lossy(&line_buf);
546 if let Ok(mut fl) = full_log.lock() {
547 let ts = now_rfc3339();
548 let _ = writeln!(fl, "{ts} [{label}] {line}");
549 }
550 line_buf.clear();
551 } else {
552 line_buf.push(b);
553 }
554 }
555 }
556 Err(_) => break,
557 }
558 }
559 if !line_buf.is_empty() {
561 let line = String::from_utf8_lossy(&line_buf);
562 if let Ok(mut fl) = full_log.lock() {
563 let ts = now_rfc3339();
564 let _ = writeln!(fl, "{ts} [{label}] {line}");
565 }
566 }
567}
568
569pub fn supervise(opts: SuperviseOpts) -> Result<()> {
579 use std::sync::{Arc, Mutex};
580
581 let job_id = opts.job_id;
582 let root = opts.root;
583 let command = opts.command;
584
585 if command.is_empty() {
586 anyhow::bail!("supervisor: no command");
587 }
588
589 let job_dir = JobDir::open(root, job_id)?;
590
591 let meta = job_dir.read_meta()?;
593 let started_at = meta.created_at.clone();
594
595 let full_log_path = if let Some(p) = opts.full_log {
597 std::path::PathBuf::from(p)
598 } else {
599 job_dir.full_log_path()
600 };
601
602 if let Some(parent) = full_log_path.parent() {
605 std::fs::create_dir_all(parent)
606 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
607 }
608 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
609 let full_log = Arc::new(Mutex::new(full_log_file));
610
611 let cmd_str = build_cmd_str(command);
619 if opts.shell_wrapper.is_empty() {
620 anyhow::bail!("supervisor: shell wrapper must not be empty");
621 }
622 let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
623 child_cmd.args(&opts.shell_wrapper[1..]).arg(&cmd_str);
624
625 if opts.inherit_env {
626 } else {
628 child_cmd.env_clear();
629 }
630
631 for env_file in &opts.env_files {
633 let vars = load_env_file(env_file)?;
634 for (k, v) in vars {
635 child_cmd.env(&k, &v);
636 }
637 }
638
639 for env_var in &opts.env_vars {
641 let (k, v) = parse_env_var(env_var);
642 child_cmd.env(&k, &v);
643 }
644
645 if let Some(cwd) = opts.cwd {
647 child_cmd.current_dir(cwd);
648 }
649
650 let mut child = child_cmd
652 .stdin(std::process::Stdio::null())
653 .stdout(std::process::Stdio::piped())
654 .stderr(std::process::Stdio::piped())
655 .spawn()
656 .context("supervisor: spawn child")?;
657
658 let pid = child.id();
659 info!(job_id, pid, "child process started");
660
661 #[cfg(windows)]
668 let windows_job_name = {
669 match assign_to_job_object(job_id, pid) {
670 Ok(name) => Some(name),
671 Err(e) => {
672 let kill_err = child.kill();
676 let _ = child.wait(); let failed_state = JobState {
679 job: JobStateJob {
680 id: job_id.to_string(),
681 status: JobStatus::Failed,
682 started_at: started_at.clone(),
683 },
684 result: JobStateResult {
685 exit_code: None,
686 signal: None,
687 duration_ms: None,
688 },
689 pid: Some(pid),
690 finished_at: Some(now_rfc3339()),
691 updated_at: now_rfc3339(),
692 windows_job_name: None,
693 };
694 let _ = job_dir.write_state(&failed_state);
697
698 if opts.notify_command.is_some() || opts.notify_file.is_some() {
702 let finished_at_ts =
703 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
704 let stdout_log = job_dir.stdout_path().display().to_string();
705 let stderr_log = job_dir.stderr_path().display().to_string();
706 let fail_event = crate::schema::CompletionEvent {
707 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
708 event_type: "job.finished".to_string(),
709 job_id: job_id.to_string(),
710 state: JobStatus::Failed.as_str().to_string(),
711 command: meta.command.clone(),
712 cwd: meta.cwd.clone(),
713 started_at: started_at.clone(),
714 finished_at: finished_at_ts,
715 duration_ms: None,
716 exit_code: None,
717 signal: None,
718 stdout_log_path: stdout_log,
719 stderr_log_path: stderr_log,
720 };
721 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
722 let fail_event_path = job_dir.completion_event_path().display().to_string();
723 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
724 Vec::new();
725 if let Err(we) = job_dir.write_completion_event_atomic(
726 &crate::schema::CompletionEventRecord {
727 event: fail_event.clone(),
728 delivery_results: vec![],
729 },
730 ) {
731 warn!(
732 job_id,
733 error = %we,
734 "failed to write initial completion_event.json for failed job"
735 );
736 }
737 if let Some(ref shell_cmd) = opts.notify_command {
738 fail_delivery_results.push(dispatch_command_sink(
739 shell_cmd,
740 &fail_event_json,
741 job_id,
742 &fail_event_path,
743 &opts.shell_wrapper,
744 ));
745 }
746 if let Some(ref file_path) = opts.notify_file {
747 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
748 }
749 if let Err(we) = job_dir.write_completion_event_atomic(
750 &crate::schema::CompletionEventRecord {
751 event: fail_event,
752 delivery_results: fail_delivery_results,
753 },
754 ) {
755 warn!(
756 job_id,
757 error = %we,
758 "failed to update completion_event.json with delivery results for failed job"
759 );
760 }
761 }
762
763 if let Err(ke) = kill_err {
764 return Err(anyhow::anyhow!(
765 "supervisor: failed to assign pid {pid} to Job Object \
766 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
767 ));
768 }
769 return Err(anyhow::anyhow!(
770 "supervisor: failed to assign pid {pid} to Job Object \
771 (Windows MUST requirement); child process was killed; \
772 consider running outside a nested Job Object environment: {e}"
773 ));
774 }
775 }
776 };
777 #[cfg(not(windows))]
778 let windows_job_name: Option<String> = None;
779
780 let state = JobState {
785 job: JobStateJob {
786 id: job_id.to_string(),
787 status: JobStatus::Running,
788 started_at: started_at.clone(),
789 },
790 result: JobStateResult {
791 exit_code: None,
792 signal: None,
793 duration_ms: None,
794 },
795 pid: Some(pid),
796 finished_at: None,
797 updated_at: now_rfc3339(),
798 windows_job_name,
799 };
800 job_dir.write_state(&state)?;
801
802 let child_start_time = std::time::Instant::now();
803
804 let child_stdout = child.stdout.take().expect("child stdout piped");
806 let child_stderr = child.stderr.take().expect("child stderr piped");
807
808 let stdout_log_path = job_dir.stdout_path();
810 let full_log_stdout = Arc::clone(&full_log);
811 let t_stdout = std::thread::spawn(move || {
812 stream_to_logs(child_stdout, &stdout_log_path, full_log_stdout, "STDOUT");
813 });
814
815 let stderr_log_path = job_dir.stderr_path();
817 let full_log_stderr = Arc::clone(&full_log);
818 let t_stderr = std::thread::spawn(move || {
819 stream_to_logs(child_stderr, &stderr_log_path, full_log_stderr, "STDERR");
820 });
821
822 let timeout_ms = opts.timeout_ms;
825 let kill_after_ms = opts.kill_after_ms;
826 let progress_every_ms = opts.progress_every_ms;
827 let state_path = job_dir.state_path();
828 let job_id_str = job_id.to_string();
829
830 use std::sync::atomic::{AtomicBool, Ordering};
832 let child_done = Arc::new(AtomicBool::new(false));
833
834 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
835 let state_path_clone = state_path.clone();
836 let child_done_clone = Arc::clone(&child_done);
837 Some(std::thread::spawn(move || {
838 let start = std::time::Instant::now();
839 let timeout_dur = if timeout_ms > 0 {
840 Some(std::time::Duration::from_millis(timeout_ms))
841 } else {
842 None
843 };
844 let progress_dur = if progress_every_ms > 0 {
845 Some(std::time::Duration::from_millis(progress_every_ms))
846 } else {
847 None
848 };
849
850 let poll_interval = std::time::Duration::from_millis(100);
851
852 loop {
853 std::thread::sleep(poll_interval);
854
855 if child_done_clone.load(Ordering::Relaxed) {
857 break;
858 }
859
860 let elapsed = start.elapsed();
861
862 if let Some(td) = timeout_dur
864 && elapsed >= td
865 {
866 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM");
867 #[cfg(unix)]
869 {
870 unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
871 }
872 if kill_after_ms > 0 {
874 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
875 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL");
876 #[cfg(unix)]
877 {
878 unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
879 }
880 } else {
881 #[cfg(unix)]
883 {
884 unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
885 }
886 }
887 break;
888 }
889
890 if let Some(pd) = progress_dur {
892 let elapsed_ms = elapsed.as_millis() as u64;
893 let pd_ms = pd.as_millis() as u64;
894 let poll_ms = poll_interval.as_millis() as u64;
895 if elapsed_ms % pd_ms < poll_ms {
896 if let Ok(raw) = std::fs::read(&state_path_clone)
898 && let Ok(mut st) =
899 serde_json::from_slice::<crate::schema::JobState>(&raw)
900 {
901 st.updated_at = now_rfc3339();
902 if let Ok(s) = serde_json::to_string_pretty(&st) {
903 let _ = std::fs::write(&state_path_clone, s);
904 }
905 }
906 }
907 }
908 }
909 }))
910 } else {
911 None
912 };
913
914 let exit_status = child.wait().context("wait for child")?;
916
917 child_done.store(true, Ordering::Relaxed);
919
920 let _ = t_stdout.join();
922 let _ = t_stderr.join();
923
924 if let Some(w) = watcher {
926 let _ = w.join();
927 }
928
929 let duration_ms = child_start_time.elapsed().as_millis() as u64;
930 let exit_code = exit_status.code();
931 let finished_at = now_rfc3339();
932
933 #[cfg(unix)]
935 let (terminal_status, signal_name) = {
936 use std::os::unix::process::ExitStatusExt;
937 if let Some(sig) = exit_status.signal() {
938 (JobStatus::Killed, Some(sig.to_string()))
939 } else {
940 (JobStatus::Exited, None)
941 }
942 };
943 #[cfg(not(unix))]
944 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
945
946 let state = JobState {
947 job: JobStateJob {
948 id: job_id.to_string(),
949 status: terminal_status.clone(),
950 started_at: started_at.clone(),
951 },
952 result: JobStateResult {
953 exit_code,
954 signal: signal_name.clone(),
955 duration_ms: Some(duration_ms),
956 },
957 pid: Some(pid),
958 finished_at: Some(finished_at.clone()),
959 updated_at: now_rfc3339(),
960 windows_job_name: None, };
962 job_dir.write_state(&state)?;
963 info!(job_id, ?exit_code, "child process finished");
964
965 let has_notification = opts.notify_command.is_some() || opts.notify_file.is_some();
968 if has_notification {
969 let stdout_log = job_dir.stdout_path().display().to_string();
970 let stderr_log = job_dir.stderr_path().display().to_string();
971 let event = crate::schema::CompletionEvent {
972 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
973 event_type: "job.finished".to_string(),
974 job_id: job_id.to_string(),
975 state: terminal_status.as_str().to_string(),
976 command: meta.command.clone(),
977 cwd: meta.cwd.clone(),
978 started_at,
979 finished_at,
980 duration_ms: Some(duration_ms),
981 exit_code,
982 signal: signal_name,
983 stdout_log_path: stdout_log,
984 stderr_log_path: stderr_log,
985 };
986
987 let event_json = serde_json::to_string(&event).unwrap_or_default();
988 let event_path = job_dir.completion_event_path().display().to_string();
989 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
990
991 if let Err(e) =
993 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
994 event: event.clone(),
995 delivery_results: vec![],
996 })
997 {
998 warn!(job_id, error = %e, "failed to write initial completion_event.json");
999 }
1000
1001 if let Some(ref shell_cmd) = opts.notify_command {
1002 delivery_results.push(dispatch_command_sink(
1003 shell_cmd,
1004 &event_json,
1005 job_id,
1006 &event_path,
1007 &opts.shell_wrapper,
1008 ));
1009 }
1010 if let Some(ref file_path) = opts.notify_file {
1011 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1012 }
1013
1014 if let Err(e) =
1016 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1017 event,
1018 delivery_results,
1019 })
1020 {
1021 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1022 }
1023 }
1024
1025 Ok(())
1026}
1027
1028fn dispatch_command_sink(
1035 shell_cmd: &str,
1036 event_json: &str,
1037 job_id: &str,
1038 event_path: &str,
1039 shell_wrapper: &[String],
1040) -> crate::schema::SinkDeliveryResult {
1041 use std::io::Write;
1042 let attempted_at = now_rfc3339();
1043 let target = shell_cmd.to_string();
1044
1045 if shell_cmd.trim().is_empty() {
1046 return crate::schema::SinkDeliveryResult {
1047 sink_type: "command".to_string(),
1048 target,
1049 success: false,
1050 error: Some("empty shell command".to_string()),
1051 attempted_at,
1052 };
1053 }
1054
1055 if shell_wrapper.is_empty() {
1056 return crate::schema::SinkDeliveryResult {
1057 sink_type: "command".to_string(),
1058 target,
1059 success: false,
1060 error: Some("shell wrapper must not be empty".to_string()),
1061 attempted_at,
1062 };
1063 }
1064
1065 let mut cmd = Command::new(&shell_wrapper[0]);
1066 cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1067
1068 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1069 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1070 cmd.env("AGENT_EXEC_EVENT_TYPE", "job.finished");
1071 cmd.stdin(std::process::Stdio::piped());
1072 cmd.stdout(std::process::Stdio::null());
1073 cmd.stderr(std::process::Stdio::null());
1074
1075 match cmd.spawn() {
1076 Ok(mut child) => {
1077 if let Some(mut stdin) = child.stdin.take() {
1078 let _ = stdin.write_all(event_json.as_bytes());
1079 }
1080 match child.wait() {
1081 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1082 sink_type: "command".to_string(),
1083 target,
1084 success: true,
1085 error: None,
1086 attempted_at,
1087 },
1088 Ok(status) => crate::schema::SinkDeliveryResult {
1089 sink_type: "command".to_string(),
1090 target,
1091 success: false,
1092 error: Some(format!("exited with status {status}")),
1093 attempted_at,
1094 },
1095 Err(e) => crate::schema::SinkDeliveryResult {
1096 sink_type: "command".to_string(),
1097 target,
1098 success: false,
1099 error: Some(format!("wait error: {e}")),
1100 attempted_at,
1101 },
1102 }
1103 }
1104 Err(e) => crate::schema::SinkDeliveryResult {
1105 sink_type: "command".to_string(),
1106 target,
1107 success: false,
1108 error: Some(format!("spawn error: {e}")),
1109 attempted_at,
1110 },
1111 }
1112}
1113
1114fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1117 use std::io::Write;
1118 let attempted_at = now_rfc3339();
1119 let path = std::path::Path::new(file_path);
1120
1121 if let Some(parent) = path.parent()
1122 && let Err(e) = std::fs::create_dir_all(parent)
1123 {
1124 return crate::schema::SinkDeliveryResult {
1125 sink_type: "file".to_string(),
1126 target: file_path.to_string(),
1127 success: false,
1128 error: Some(format!("create parent dir: {e}")),
1129 attempted_at,
1130 };
1131 }
1132
1133 match std::fs::OpenOptions::new()
1134 .create(true)
1135 .append(true)
1136 .open(path)
1137 {
1138 Ok(mut f) => match writeln!(f, "{event_json}") {
1139 Ok(_) => crate::schema::SinkDeliveryResult {
1140 sink_type: "file".to_string(),
1141 target: file_path.to_string(),
1142 success: true,
1143 error: None,
1144 attempted_at,
1145 },
1146 Err(e) => crate::schema::SinkDeliveryResult {
1147 sink_type: "file".to_string(),
1148 target: file_path.to_string(),
1149 success: false,
1150 error: Some(format!("write error: {e}")),
1151 attempted_at,
1152 },
1153 },
1154 Err(e) => crate::schema::SinkDeliveryResult {
1155 sink_type: "file".to_string(),
1156 target: file_path.to_string(),
1157 success: false,
1158 error: Some(format!("open error: {e}")),
1159 attempted_at,
1160 },
1161 }
1162}
1163
1164pub fn now_rfc3339_pub() -> String {
1166 now_rfc3339()
1167}
1168
1169fn now_rfc3339() -> String {
1170 let d = std::time::SystemTime::now()
1172 .duration_since(std::time::UNIX_EPOCH)
1173 .unwrap_or_default();
1174 format_rfc3339(d.as_secs())
1175}
1176
1177fn format_rfc3339(secs: u64) -> String {
1178 let mut s = secs;
1180 let seconds = s % 60;
1181 s /= 60;
1182 let minutes = s % 60;
1183 s /= 60;
1184 let hours = s % 24;
1185 s /= 24;
1186
1187 let mut days = s;
1189 let mut year = 1970u64;
1190 loop {
1191 let days_in_year = if is_leap(year) { 366 } else { 365 };
1192 if days < days_in_year {
1193 break;
1194 }
1195 days -= days_in_year;
1196 year += 1;
1197 }
1198
1199 let leap = is_leap(year);
1200 let month_days: [u64; 12] = [
1201 31,
1202 if leap { 29 } else { 28 },
1203 31,
1204 30,
1205 31,
1206 30,
1207 31,
1208 31,
1209 30,
1210 31,
1211 30,
1212 31,
1213 ];
1214 let mut month = 0usize;
1215 for (i, &d) in month_days.iter().enumerate() {
1216 if days < d {
1217 month = i;
1218 break;
1219 }
1220 days -= d;
1221 }
1222 let day = days + 1;
1223
1224 format!(
1225 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1226 year,
1227 month + 1,
1228 day,
1229 hours,
1230 minutes,
1231 seconds
1232 )
1233}
1234
1235fn is_leap(year: u64) -> bool {
1236 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1237}
1238
1239#[cfg(windows)]
1250fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1251 use windows::Win32::Foundation::CloseHandle;
1252 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1253 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1254 use windows::core::HSTRING;
1255
1256 let job_name = format!("AgentExec-{job_id}");
1257 let hname = HSTRING::from(job_name.as_str());
1258
1259 unsafe {
1260 let proc_handle =
1262 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1263 anyhow::anyhow!(
1264 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1265 )
1266 })?;
1267
1268 let job = match CreateJobObjectW(None, &hname) {
1270 Ok(h) => h,
1271 Err(e) => {
1272 let _ = CloseHandle(proc_handle);
1273 return Err(anyhow::anyhow!(
1274 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1275 ));
1276 }
1277 };
1278
1279 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1283 let _ = CloseHandle(job);
1284 let _ = CloseHandle(proc_handle);
1285 return Err(anyhow::anyhow!(
1286 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1287 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1288 ));
1289 }
1290
1291 let _ = CloseHandle(proc_handle);
1296 std::mem::forget(job);
1299 }
1300
1301 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1302 Ok(job_name)
1303}
1304
1305fn build_cmd_str(command: &[String]) -> String {
1313 if command.len() == 1 {
1314 command[0].clone()
1315 } else {
1316 command
1317 .iter()
1318 .map(|s| posix_single_quote(s))
1319 .collect::<Vec<_>>()
1320 .join(" ")
1321 }
1322}
1323
1324fn posix_single_quote(s: &str) -> String {
1326 format!("'{}'", s.replace('\'', "'\\''"))
1327}
1328
1329#[cfg(test)]
1330mod tests {
1331 use super::*;
1332
1333 #[test]
1334 fn rfc3339_epoch() {
1335 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1336 }
1337
1338 #[test]
1339 fn rfc3339_known_date() {
1340 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1342 }
1343}