1use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18 Snapshot,
19};
20
21#[derive(Debug)]
23pub struct RunOpts<'a> {
24 pub command: Vec<String>,
26 pub root: Option<&'a str>,
28 pub snapshot_after: u64,
30 pub tail_lines: u64,
32 pub max_bytes: u64,
34 pub timeout_ms: u64,
36 pub kill_after_ms: u64,
38 pub cwd: Option<&'a str>,
40 pub env_vars: Vec<String>,
42 pub env_files: Vec<String>,
44 pub inherit_env: bool,
46 pub mask: Vec<String>,
48 pub log: Option<&'a str>,
50 pub progress_every_ms: u64,
52 pub wait: bool,
55 pub wait_poll_ms: u64,
57 pub notify_command: Option<Vec<String>>,
59 pub notify_file: Option<String>,
61}
62
63impl<'a> Default for RunOpts<'a> {
64 fn default() -> Self {
65 RunOpts {
66 command: vec![],
67 root: None,
68 snapshot_after: 10_000,
69 tail_lines: 50,
70 max_bytes: 65536,
71 timeout_ms: 0,
72 kill_after_ms: 0,
73 cwd: None,
74 env_vars: vec![],
75 env_files: vec![],
76 inherit_env: true,
77 mask: vec![],
78 log: None,
79 progress_every_ms: 0,
80 wait: false,
81 wait_poll_ms: 200,
82 notify_command: None,
83 notify_file: None,
84 }
85 }
86}
87
88const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
90
91pub fn execute(opts: RunOpts) -> Result<()> {
93 if opts.command.is_empty() {
94 anyhow::bail!("no command specified for run");
95 }
96
97 let elapsed_start = std::time::Instant::now();
98
99 let root = resolve_root(opts.root);
100 std::fs::create_dir_all(&root)
101 .with_context(|| format!("create jobs root {}", root.display()))?;
102
103 let job_id = Ulid::new().to_string();
104 let created_at = now_rfc3339();
105
106 let env_keys: Vec<String> = opts
108 .env_vars
109 .iter()
110 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
111 .collect();
112
113 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
115
116 let effective_cwd = resolve_effective_cwd(opts.cwd);
120
121 let notification = if opts.notify_command.is_some() || opts.notify_file.is_some() {
122 Some(crate::schema::NotificationConfig {
123 notify_command: opts.notify_command.clone(),
124 notify_file: opts.notify_file.clone(),
125 })
126 } else {
127 None
128 };
129
130 let meta = JobMeta {
131 job: JobMetaJob { id: job_id.clone() },
132 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
133 command: opts.command.clone(),
134 created_at: created_at.clone(),
135 root: root.display().to_string(),
136 env_keys,
137 env_vars: masked_env_vars.clone(),
138 mask: opts.mask.clone(),
139 cwd: Some(effective_cwd),
140 notification,
141 };
142
143 let job_dir = JobDir::create(&root, &job_id, &meta)?;
144 info!(job_id = %job_id, "created job directory");
145
146 let full_log_path = if let Some(log) = opts.log {
148 log.to_string()
149 } else {
150 job_dir.full_log_path().display().to_string()
151 };
152
153 for log_path in [
158 job_dir.stdout_path(),
159 job_dir.stderr_path(),
160 job_dir.full_log_path(),
161 ] {
162 std::fs::OpenOptions::new()
163 .create(true)
164 .append(true)
165 .open(&log_path)
166 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
167 }
168
169 let exe = std::env::current_exe().context("resolve current exe")?;
171 let mut supervisor_cmd = Command::new(&exe);
172 supervisor_cmd
173 .arg("_supervise")
174 .arg("--job-id")
175 .arg(&job_id)
176 .arg("--root")
177 .arg(root.display().to_string())
178 .arg("--full-log")
179 .arg(&full_log_path);
180
181 if opts.timeout_ms > 0 {
182 supervisor_cmd
183 .arg("--timeout")
184 .arg(opts.timeout_ms.to_string());
185 }
186 if opts.kill_after_ms > 0 {
187 supervisor_cmd
188 .arg("--kill-after")
189 .arg(opts.kill_after_ms.to_string());
190 }
191 if let Some(cwd) = opts.cwd {
192 supervisor_cmd.arg("--cwd").arg(cwd);
193 }
194 for env_file in &opts.env_files {
195 supervisor_cmd.arg("--env-file").arg(env_file);
196 }
197 for env_var in &opts.env_vars {
198 supervisor_cmd.arg("--env").arg(env_var);
199 }
200 if !opts.inherit_env {
201 supervisor_cmd.arg("--no-inherit-env");
202 }
203 if opts.progress_every_ms > 0 {
206 supervisor_cmd
207 .arg("--progress-every")
208 .arg(opts.progress_every_ms.to_string());
209 }
210 if let Some(ref nc) = opts.notify_command {
211 let json = serde_json::to_string(nc).unwrap_or_default();
212 supervisor_cmd.arg("--notify-command").arg(&json);
213 }
214 if let Some(ref nf) = opts.notify_file {
215 supervisor_cmd.arg("--notify-file").arg(nf);
216 }
217
218 supervisor_cmd
219 .arg("--")
220 .args(&opts.command)
221 .stdin(std::process::Stdio::null())
222 .stdout(std::process::Stdio::null())
223 .stderr(std::process::Stdio::null());
224
225 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
226
227 let supervisor_pid = supervisor.id();
228 debug!(supervisor_pid, "supervisor spawned");
229
230 job_dir.init_state(supervisor_pid, &created_at)?;
234
235 #[cfg(windows)]
240 {
241 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
242 loop {
243 std::thread::sleep(std::time::Duration::from_millis(10));
244 if let Ok(current_state) = job_dir.read_state() {
245 let supervisor_updated = current_state
249 .pid
250 .map(|p| p != supervisor_pid)
251 .unwrap_or(false)
252 || *current_state.status() == JobStatus::Failed;
253 if supervisor_updated {
254 if *current_state.status() == JobStatus::Failed {
255 anyhow::bail!(
259 "supervisor failed to assign child process to Job Object \
260 (Windows MUST requirement); see stderr for details"
261 );
262 }
263 debug!("supervisor confirmed Job Object assignment via state.json handshake");
264 break;
265 }
266 }
267 if std::time::Instant::now() >= handshake_deadline {
268 debug!("supervisor handshake timed out; proceeding with initial state");
271 break;
272 }
273 }
274 }
275
276 let stdout_log_path = job_dir.stdout_path().display().to_string();
278 let stderr_log_path = job_dir.stderr_path().display().to_string();
279
280 let effective_snapshot_after = if opts.wait {
284 0
287 } else {
288 opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
289 };
290
291 let wait_start = std::time::Instant::now();
294
295 let snapshot = if effective_snapshot_after > 0 {
299 debug!(ms = effective_snapshot_after, "polling for snapshot");
300 let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
301 let poll_interval = std::time::Duration::from_millis(15);
303 loop {
304 std::thread::sleep(poll_interval);
305 if let Ok(st) = job_dir.read_state()
309 && *st.status() != JobStatus::Running
310 {
311 debug!("snapshot poll: job no longer running, exiting early");
312 break;
313 }
314 if std::time::Instant::now() >= deadline {
316 debug!("snapshot poll: deadline reached");
317 break;
318 }
319 }
320 let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
321 Some(snap)
322 } else {
323 None
324 };
325
326 let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
330 debug!("--wait: polling for terminal state");
331 let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
332 loop {
333 std::thread::sleep(poll);
334 if let Ok(st) = job_dir.read_state()
335 && *st.status() != JobStatus::Running
336 {
337 let snap = build_snapshot(&job_dir, opts.tail_lines, opts.max_bytes);
338 let ec = st.exit_code();
339 let fa = st.finished_at.clone();
340 let state_str = st.status().as_str().to_string();
341 break (state_str, ec, fa, Some(snap));
342 }
343 }
344 } else {
345 (JobStatus::Running.as_str().to_string(), None, None, None)
346 };
347
348 let waited_ms = wait_start.elapsed().as_millis() as u64;
350
351 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
352
353 let response = Response::new(
354 "run",
355 RunData {
356 job_id,
357 state: final_state,
358 env_vars: masked_env_vars,
361 snapshot,
362 stdout_log_path,
363 stderr_log_path,
364 waited_ms,
365 elapsed_ms,
366 exit_code: exit_code_opt,
367 finished_at: finished_at_opt,
368 final_snapshot: final_snapshot_opt,
369 },
370 );
371 response.print();
372 Ok(())
373}
374
375fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
376 let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
377 let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
378 Snapshot {
379 truncated: stdout.truncated || stderr.truncated,
380 encoding: "utf-8-lossy".to_string(),
381 stdout_observed_bytes: stdout.observed_bytes,
382 stderr_observed_bytes: stderr.observed_bytes,
383 stdout_included_bytes: stdout.included_bytes,
384 stderr_included_bytes: stderr.included_bytes,
385 stdout_tail: stdout.tail,
386 stderr_tail: stderr.tail,
387 }
388}
389
390#[derive(Debug)]
396pub struct SuperviseOpts<'a> {
397 pub job_id: &'a str,
398 pub root: &'a Path,
399 pub command: &'a [String],
400 pub full_log: Option<&'a str>,
402 pub timeout_ms: u64,
404 pub kill_after_ms: u64,
406 pub cwd: Option<&'a str>,
408 pub env_vars: Vec<String>,
410 pub env_files: Vec<String>,
412 pub inherit_env: bool,
414 pub progress_every_ms: u64,
416 pub notify_command: Option<Vec<String>>,
418 pub notify_file: Option<String>,
420}
421
422pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
429 let base = match cwd_override {
430 Some(p) => std::path::PathBuf::from(p),
431 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
432 };
433
434 match base.canonicalize() {
436 Ok(canonical) => canonical.display().to_string(),
437 Err(_) => {
438 if base.is_absolute() {
440 base.display().to_string()
441 } else {
442 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
444 cwd.join(base).display().to_string()
445 }
446 }
447 }
448}
449
450fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
453 if mask_keys.is_empty() {
454 return env_vars.to_vec();
455 }
456 env_vars
457 .iter()
458 .map(|s| {
459 let (key, _val) = parse_env_var(s);
460 if mask_keys.iter().any(|k| k == &key) {
461 format!("{key}=***")
462 } else {
463 s.clone()
464 }
465 })
466 .collect()
467}
468
469fn parse_env_var(s: &str) -> (String, String) {
471 if let Some(pos) = s.find('=') {
472 (s[..pos].to_string(), s[pos + 1..].to_string())
473 } else {
474 (s.to_string(), String::new())
475 }
476}
477
478fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
481 let contents =
482 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
483 let mut vars = Vec::new();
484 for line in contents.lines() {
485 let line = line.trim();
486 if line.is_empty() || line.starts_with('#') {
487 continue;
488 }
489 vars.push(parse_env_var(line));
490 }
491 Ok(vars)
492}
493
494fn stream_to_logs<R>(
508 stream: R,
509 log_path: &std::path::Path,
510 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
511 label: &str,
512) where
513 R: std::io::Read,
514{
515 use std::io::Write;
516 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
517 let mut stream = stream;
518 let mut buf = [0u8; 8192];
519 let mut line_buf: Vec<u8> = Vec::new();
521 loop {
522 match stream.read(&mut buf) {
523 Ok(0) => break, Ok(n) => {
525 let chunk = &buf[..n];
526 let _ = log_file.write_all(chunk);
528 for &b in chunk {
530 if b == b'\n' {
531 let line = String::from_utf8_lossy(&line_buf);
532 if let Ok(mut fl) = full_log.lock() {
533 let ts = now_rfc3339();
534 let _ = writeln!(fl, "{ts} [{label}] {line}");
535 }
536 line_buf.clear();
537 } else {
538 line_buf.push(b);
539 }
540 }
541 }
542 Err(_) => break,
543 }
544 }
545 if !line_buf.is_empty() {
547 let line = String::from_utf8_lossy(&line_buf);
548 if let Ok(mut fl) = full_log.lock() {
549 let ts = now_rfc3339();
550 let _ = writeln!(fl, "{ts} [{label}] {line}");
551 }
552 }
553}
554
555pub fn supervise(opts: SuperviseOpts) -> Result<()> {
565 use std::sync::{Arc, Mutex};
566
567 let job_id = opts.job_id;
568 let root = opts.root;
569 let command = opts.command;
570
571 if command.is_empty() {
572 anyhow::bail!("supervisor: no command");
573 }
574
575 let job_dir = JobDir::open(root, job_id)?;
576
577 let meta = job_dir.read_meta()?;
579 let started_at = meta.created_at.clone();
580
581 let full_log_path = if let Some(p) = opts.full_log {
583 std::path::PathBuf::from(p)
584 } else {
585 job_dir.full_log_path()
586 };
587
588 if let Some(parent) = full_log_path.parent() {
591 std::fs::create_dir_all(parent)
592 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
593 }
594 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
595 let full_log = Arc::new(Mutex::new(full_log_file));
596
597 let mut child_cmd = Command::new(&command[0]);
599 child_cmd.args(&command[1..]);
600
601 if opts.inherit_env {
602 } else {
604 child_cmd.env_clear();
605 }
606
607 for env_file in &opts.env_files {
609 let vars = load_env_file(env_file)?;
610 for (k, v) in vars {
611 child_cmd.env(&k, &v);
612 }
613 }
614
615 for env_var in &opts.env_vars {
617 let (k, v) = parse_env_var(env_var);
618 child_cmd.env(&k, &v);
619 }
620
621 if let Some(cwd) = opts.cwd {
623 child_cmd.current_dir(cwd);
624 }
625
626 let mut child = child_cmd
628 .stdin(std::process::Stdio::null())
629 .stdout(std::process::Stdio::piped())
630 .stderr(std::process::Stdio::piped())
631 .spawn()
632 .context("supervisor: spawn child")?;
633
634 let pid = child.id();
635 info!(job_id, pid, "child process started");
636
637 #[cfg(windows)]
644 let windows_job_name = {
645 match assign_to_job_object(job_id, pid) {
646 Ok(name) => Some(name),
647 Err(e) => {
648 let kill_err = child.kill();
652 let _ = child.wait(); let failed_state = JobState {
655 job: JobStateJob {
656 id: job_id.to_string(),
657 status: JobStatus::Failed,
658 started_at: started_at.clone(),
659 },
660 result: JobStateResult {
661 exit_code: None,
662 signal: None,
663 duration_ms: None,
664 },
665 pid: Some(pid),
666 finished_at: Some(now_rfc3339()),
667 updated_at: now_rfc3339(),
668 windows_job_name: None,
669 };
670 let _ = job_dir.write_state(&failed_state);
673
674 if opts.notify_command.is_some() || opts.notify_file.is_some() {
678 let finished_at_ts =
679 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
680 let stdout_log = job_dir.stdout_path().display().to_string();
681 let stderr_log = job_dir.stderr_path().display().to_string();
682 let fail_event = crate::schema::CompletionEvent {
683 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
684 event_type: "job.finished".to_string(),
685 job_id: job_id.to_string(),
686 state: JobStatus::Failed.as_str().to_string(),
687 command: meta.command.clone(),
688 cwd: meta.cwd.clone(),
689 started_at: started_at.clone(),
690 finished_at: finished_at_ts,
691 duration_ms: None,
692 exit_code: None,
693 signal: None,
694 stdout_log_path: stdout_log,
695 stderr_log_path: stderr_log,
696 };
697 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
698 let fail_event_path = job_dir.completion_event_path().display().to_string();
699 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
700 Vec::new();
701 if let Err(we) = job_dir.write_completion_event_atomic(
702 &crate::schema::CompletionEventRecord {
703 event: fail_event.clone(),
704 delivery_results: vec![],
705 },
706 ) {
707 warn!(
708 job_id,
709 error = %we,
710 "failed to write initial completion_event.json for failed job"
711 );
712 }
713 if let Some(ref argv) = opts.notify_command {
714 fail_delivery_results.push(dispatch_command_sink(
715 argv,
716 &fail_event_json,
717 job_id,
718 &fail_event_path,
719 ));
720 }
721 if let Some(ref file_path) = opts.notify_file {
722 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
723 }
724 if let Err(we) = job_dir.write_completion_event_atomic(
725 &crate::schema::CompletionEventRecord {
726 event: fail_event,
727 delivery_results: fail_delivery_results,
728 },
729 ) {
730 warn!(
731 job_id,
732 error = %we,
733 "failed to update completion_event.json with delivery results for failed job"
734 );
735 }
736 }
737
738 if let Err(ke) = kill_err {
739 return Err(anyhow::anyhow!(
740 "supervisor: failed to assign pid {pid} to Job Object \
741 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
742 ));
743 }
744 return Err(anyhow::anyhow!(
745 "supervisor: failed to assign pid {pid} to Job Object \
746 (Windows MUST requirement); child process was killed; \
747 consider running outside a nested Job Object environment: {e}"
748 ));
749 }
750 }
751 };
752 #[cfg(not(windows))]
753 let windows_job_name: Option<String> = None;
754
755 let state = JobState {
760 job: JobStateJob {
761 id: job_id.to_string(),
762 status: JobStatus::Running,
763 started_at: started_at.clone(),
764 },
765 result: JobStateResult {
766 exit_code: None,
767 signal: None,
768 duration_ms: None,
769 },
770 pid: Some(pid),
771 finished_at: None,
772 updated_at: now_rfc3339(),
773 windows_job_name,
774 };
775 job_dir.write_state(&state)?;
776
777 let child_start_time = std::time::Instant::now();
778
779 let child_stdout = child.stdout.take().expect("child stdout piped");
781 let child_stderr = child.stderr.take().expect("child stderr piped");
782
783 let stdout_log_path = job_dir.stdout_path();
785 let full_log_stdout = Arc::clone(&full_log);
786 let t_stdout = std::thread::spawn(move || {
787 stream_to_logs(child_stdout, &stdout_log_path, full_log_stdout, "STDOUT");
788 });
789
790 let stderr_log_path = job_dir.stderr_path();
792 let full_log_stderr = Arc::clone(&full_log);
793 let t_stderr = std::thread::spawn(move || {
794 stream_to_logs(child_stderr, &stderr_log_path, full_log_stderr, "STDERR");
795 });
796
797 let timeout_ms = opts.timeout_ms;
800 let kill_after_ms = opts.kill_after_ms;
801 let progress_every_ms = opts.progress_every_ms;
802 let state_path = job_dir.state_path();
803 let job_id_str = job_id.to_string();
804
805 use std::sync::atomic::{AtomicBool, Ordering};
807 let child_done = Arc::new(AtomicBool::new(false));
808
809 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
810 let state_path_clone = state_path.clone();
811 let child_done_clone = Arc::clone(&child_done);
812 Some(std::thread::spawn(move || {
813 let start = std::time::Instant::now();
814 let timeout_dur = if timeout_ms > 0 {
815 Some(std::time::Duration::from_millis(timeout_ms))
816 } else {
817 None
818 };
819 let progress_dur = if progress_every_ms > 0 {
820 Some(std::time::Duration::from_millis(progress_every_ms))
821 } else {
822 None
823 };
824
825 let poll_interval = std::time::Duration::from_millis(100);
826
827 loop {
828 std::thread::sleep(poll_interval);
829
830 if child_done_clone.load(Ordering::Relaxed) {
832 break;
833 }
834
835 let elapsed = start.elapsed();
836
837 if let Some(td) = timeout_dur
839 && elapsed >= td
840 {
841 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM");
842 #[cfg(unix)]
844 {
845 unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
846 }
847 if kill_after_ms > 0 {
849 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
850 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL");
851 #[cfg(unix)]
852 {
853 unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
854 }
855 } else {
856 #[cfg(unix)]
858 {
859 unsafe { libc::kill(pid as libc::pid_t, libc::SIGKILL) };
860 }
861 }
862 break;
863 }
864
865 if let Some(pd) = progress_dur {
867 let elapsed_ms = elapsed.as_millis() as u64;
868 let pd_ms = pd.as_millis() as u64;
869 let poll_ms = poll_interval.as_millis() as u64;
870 if elapsed_ms % pd_ms < poll_ms {
871 if let Ok(raw) = std::fs::read(&state_path_clone)
873 && let Ok(mut st) =
874 serde_json::from_slice::<crate::schema::JobState>(&raw)
875 {
876 st.updated_at = now_rfc3339();
877 if let Ok(s) = serde_json::to_string_pretty(&st) {
878 let _ = std::fs::write(&state_path_clone, s);
879 }
880 }
881 }
882 }
883 }
884 }))
885 } else {
886 None
887 };
888
889 let exit_status = child.wait().context("wait for child")?;
891
892 child_done.store(true, Ordering::Relaxed);
894
895 let _ = t_stdout.join();
897 let _ = t_stderr.join();
898
899 if let Some(w) = watcher {
901 let _ = w.join();
902 }
903
904 let duration_ms = child_start_time.elapsed().as_millis() as u64;
905 let exit_code = exit_status.code();
906 let finished_at = now_rfc3339();
907
908 #[cfg(unix)]
910 let (terminal_status, signal_name) = {
911 use std::os::unix::process::ExitStatusExt;
912 if let Some(sig) = exit_status.signal() {
913 (JobStatus::Killed, Some(sig.to_string()))
914 } else {
915 (JobStatus::Exited, None)
916 }
917 };
918 #[cfg(not(unix))]
919 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
920
921 let state = JobState {
922 job: JobStateJob {
923 id: job_id.to_string(),
924 status: terminal_status.clone(),
925 started_at: started_at.clone(),
926 },
927 result: JobStateResult {
928 exit_code,
929 signal: signal_name.clone(),
930 duration_ms: Some(duration_ms),
931 },
932 pid: Some(pid),
933 finished_at: Some(finished_at.clone()),
934 updated_at: now_rfc3339(),
935 windows_job_name: None, };
937 job_dir.write_state(&state)?;
938 info!(job_id, ?exit_code, "child process finished");
939
940 let has_notification = opts.notify_command.is_some() || opts.notify_file.is_some();
943 if has_notification {
944 let stdout_log = job_dir.stdout_path().display().to_string();
945 let stderr_log = job_dir.stderr_path().display().to_string();
946 let event = crate::schema::CompletionEvent {
947 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
948 event_type: "job.finished".to_string(),
949 job_id: job_id.to_string(),
950 state: terminal_status.as_str().to_string(),
951 command: meta.command.clone(),
952 cwd: meta.cwd.clone(),
953 started_at,
954 finished_at,
955 duration_ms: Some(duration_ms),
956 exit_code,
957 signal: signal_name,
958 stdout_log_path: stdout_log,
959 stderr_log_path: stderr_log,
960 };
961
962 let event_json = serde_json::to_string(&event).unwrap_or_default();
963 let event_path = job_dir.completion_event_path().display().to_string();
964 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
965
966 if let Err(e) =
968 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
969 event: event.clone(),
970 delivery_results: vec![],
971 })
972 {
973 warn!(job_id, error = %e, "failed to write initial completion_event.json");
974 }
975
976 if let Some(ref argv) = opts.notify_command {
977 delivery_results.push(dispatch_command_sink(
978 argv,
979 &event_json,
980 job_id,
981 &event_path,
982 ));
983 }
984 if let Some(ref file_path) = opts.notify_file {
985 delivery_results.push(dispatch_file_sink(file_path, &event_json));
986 }
987
988 if let Err(e) =
990 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
991 event,
992 delivery_results,
993 })
994 {
995 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
996 }
997 }
998
999 Ok(())
1000}
1001
1002fn dispatch_command_sink(
1006 argv: &[String],
1007 event_json: &str,
1008 job_id: &str,
1009 event_path: &str,
1010) -> crate::schema::SinkDeliveryResult {
1011 use std::io::Write;
1012 let attempted_at = now_rfc3339();
1013 let target = serde_json::to_string(argv).unwrap_or_default();
1014
1015 if argv.is_empty() {
1016 return crate::schema::SinkDeliveryResult {
1017 sink_type: "command".to_string(),
1018 target,
1019 success: false,
1020 error: Some("empty argv".to_string()),
1021 attempted_at,
1022 };
1023 }
1024
1025 let mut cmd = Command::new(&argv[0]);
1026 cmd.args(&argv[1..]);
1027 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1028 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1029 cmd.env("AGENT_EXEC_EVENT_TYPE", "job.finished");
1030 cmd.stdin(std::process::Stdio::piped());
1031 cmd.stdout(std::process::Stdio::null());
1032 cmd.stderr(std::process::Stdio::null());
1033
1034 match cmd.spawn() {
1035 Ok(mut child) => {
1036 if let Some(mut stdin) = child.stdin.take() {
1037 let _ = stdin.write_all(event_json.as_bytes());
1038 }
1039 match child.wait() {
1040 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1041 sink_type: "command".to_string(),
1042 target,
1043 success: true,
1044 error: None,
1045 attempted_at,
1046 },
1047 Ok(status) => crate::schema::SinkDeliveryResult {
1048 sink_type: "command".to_string(),
1049 target,
1050 success: false,
1051 error: Some(format!("exited with status {status}")),
1052 attempted_at,
1053 },
1054 Err(e) => crate::schema::SinkDeliveryResult {
1055 sink_type: "command".to_string(),
1056 target,
1057 success: false,
1058 error: Some(format!("wait error: {e}")),
1059 attempted_at,
1060 },
1061 }
1062 }
1063 Err(e) => crate::schema::SinkDeliveryResult {
1064 sink_type: "command".to_string(),
1065 target,
1066 success: false,
1067 error: Some(format!("spawn error: {e}")),
1068 attempted_at,
1069 },
1070 }
1071}
1072
1073fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1076 use std::io::Write;
1077 let attempted_at = now_rfc3339();
1078 let path = std::path::Path::new(file_path);
1079
1080 if let Some(parent) = path.parent()
1081 && let Err(e) = std::fs::create_dir_all(parent)
1082 {
1083 return crate::schema::SinkDeliveryResult {
1084 sink_type: "file".to_string(),
1085 target: file_path.to_string(),
1086 success: false,
1087 error: Some(format!("create parent dir: {e}")),
1088 attempted_at,
1089 };
1090 }
1091
1092 match std::fs::OpenOptions::new()
1093 .create(true)
1094 .append(true)
1095 .open(path)
1096 {
1097 Ok(mut f) => match writeln!(f, "{event_json}") {
1098 Ok(_) => crate::schema::SinkDeliveryResult {
1099 sink_type: "file".to_string(),
1100 target: file_path.to_string(),
1101 success: true,
1102 error: None,
1103 attempted_at,
1104 },
1105 Err(e) => crate::schema::SinkDeliveryResult {
1106 sink_type: "file".to_string(),
1107 target: file_path.to_string(),
1108 success: false,
1109 error: Some(format!("write error: {e}")),
1110 attempted_at,
1111 },
1112 },
1113 Err(e) => crate::schema::SinkDeliveryResult {
1114 sink_type: "file".to_string(),
1115 target: file_path.to_string(),
1116 success: false,
1117 error: Some(format!("open error: {e}")),
1118 attempted_at,
1119 },
1120 }
1121}
1122
1123pub fn now_rfc3339_pub() -> String {
1125 now_rfc3339()
1126}
1127
1128fn now_rfc3339() -> String {
1129 let d = std::time::SystemTime::now()
1131 .duration_since(std::time::UNIX_EPOCH)
1132 .unwrap_or_default();
1133 format_rfc3339(d.as_secs())
1134}
1135
1136fn format_rfc3339(secs: u64) -> String {
1137 let mut s = secs;
1139 let seconds = s % 60;
1140 s /= 60;
1141 let minutes = s % 60;
1142 s /= 60;
1143 let hours = s % 24;
1144 s /= 24;
1145
1146 let mut days = s;
1148 let mut year = 1970u64;
1149 loop {
1150 let days_in_year = if is_leap(year) { 366 } else { 365 };
1151 if days < days_in_year {
1152 break;
1153 }
1154 days -= days_in_year;
1155 year += 1;
1156 }
1157
1158 let leap = is_leap(year);
1159 let month_days: [u64; 12] = [
1160 31,
1161 if leap { 29 } else { 28 },
1162 31,
1163 30,
1164 31,
1165 30,
1166 31,
1167 31,
1168 30,
1169 31,
1170 30,
1171 31,
1172 ];
1173 let mut month = 0usize;
1174 for (i, &d) in month_days.iter().enumerate() {
1175 if days < d {
1176 month = i;
1177 break;
1178 }
1179 days -= d;
1180 }
1181 let day = days + 1;
1182
1183 format!(
1184 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1185 year,
1186 month + 1,
1187 day,
1188 hours,
1189 minutes,
1190 seconds
1191 )
1192}
1193
1194fn is_leap(year: u64) -> bool {
1195 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1196}
1197
1198#[cfg(windows)]
1209fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1210 use windows::Win32::Foundation::CloseHandle;
1211 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1212 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1213 use windows::core::HSTRING;
1214
1215 let job_name = format!("AgentExec-{job_id}");
1216 let hname = HSTRING::from(job_name.as_str());
1217
1218 unsafe {
1219 let proc_handle =
1221 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1222 anyhow::anyhow!(
1223 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1224 )
1225 })?;
1226
1227 let job = match CreateJobObjectW(None, &hname) {
1229 Ok(h) => h,
1230 Err(e) => {
1231 let _ = CloseHandle(proc_handle);
1232 return Err(anyhow::anyhow!(
1233 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1234 ));
1235 }
1236 };
1237
1238 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1242 let _ = CloseHandle(job);
1243 let _ = CloseHandle(proc_handle);
1244 return Err(anyhow::anyhow!(
1245 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1246 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1247 ));
1248 }
1249
1250 let _ = CloseHandle(proc_handle);
1255 std::mem::forget(job);
1258 }
1259
1260 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1261 Ok(job_name)
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266 use super::*;
1267
1268 #[test]
1269 fn rfc3339_epoch() {
1270 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1271 }
1272
1273 #[test]
1274 fn rfc3339_known_date() {
1275 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1277 }
1278}