1use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18 Snapshot,
19};
20use crate::tag::dedup_tags;
21
22#[derive(Debug)]
31pub struct RunOpts<'a> {
32 pub command: Vec<String>,
34 pub root: Option<&'a str>,
36 pub snapshot_after: u64,
38 pub tail_lines: u64,
40 pub max_bytes: u64,
42 pub timeout_ms: u64,
44 pub kill_after_ms: u64,
46 pub cwd: Option<&'a str>,
48 pub env_vars: Vec<String>,
50 pub env_files: Vec<String>,
52 pub inherit_env: bool,
54 pub mask: Vec<String>,
56 pub tags: Vec<String>,
58 pub log: Option<&'a str>,
60 pub progress_every_ms: u64,
62 pub wait: bool,
65 pub wait_poll_ms: u64,
67 pub notify_command: Option<String>,
70 pub notify_file: Option<String>,
72 pub output_pattern: Option<String>,
74 pub output_match_type: Option<String>,
76 pub output_stream: Option<String>,
78 pub output_command: Option<String>,
80 pub output_file: Option<String>,
82 pub shell_wrapper: Vec<String>,
85}
86
87impl<'a> Default for RunOpts<'a> {
88 fn default() -> Self {
89 RunOpts {
90 command: vec![],
91 root: None,
92 snapshot_after: 10_000,
93 tail_lines: 50,
94 max_bytes: 65536,
95 timeout_ms: 0,
96 kill_after_ms: 0,
97 cwd: None,
98 env_vars: vec![],
99 env_files: vec![],
100 inherit_env: true,
101 mask: vec![],
102 tags: vec![],
103 log: None,
104 progress_every_ms: 0,
105 wait: false,
106 wait_poll_ms: 200,
107 notify_command: None,
108 notify_file: None,
109 output_pattern: None,
110 output_match_type: None,
111 output_stream: None,
112 output_command: None,
113 output_file: None,
114 shell_wrapper: crate::config::default_shell_wrapper(),
115 }
116 }
117}
118
119const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
121
122pub struct SpawnSupervisorParams {
126 pub job_id: String,
127 pub root: std::path::PathBuf,
128 pub full_log_path: String,
129 pub timeout_ms: u64,
130 pub kill_after_ms: u64,
131 pub cwd: Option<String>,
132 pub env_vars: Vec<String>,
134 pub env_files: Vec<String>,
135 pub inherit_env: bool,
136 pub progress_every_ms: u64,
137 pub notify_command: Option<String>,
138 pub notify_file: Option<String>,
139 pub shell_wrapper: Vec<String>,
140 pub command: Vec<String>,
141}
142
143pub fn spawn_supervisor_process(
148 job_dir: &JobDir,
149 params: SpawnSupervisorParams,
150) -> Result<(u32, String)> {
151 let started_at = now_rfc3339();
152
153 let exe = std::env::current_exe().context("resolve current exe")?;
154 let mut supervisor_cmd = Command::new(&exe);
155 supervisor_cmd
156 .arg("_supervise")
157 .arg("--job-id")
158 .arg(¶ms.job_id)
159 .arg("--supervise-root")
160 .arg(params.root.display().to_string())
161 .arg("--full-log")
162 .arg(¶ms.full_log_path);
163
164 if params.timeout_ms > 0 {
165 supervisor_cmd
166 .arg("--timeout")
167 .arg(params.timeout_ms.to_string());
168 }
169 if params.kill_after_ms > 0 {
170 supervisor_cmd
171 .arg("--kill-after")
172 .arg(params.kill_after_ms.to_string());
173 }
174 if let Some(ref cwd) = params.cwd {
175 supervisor_cmd.arg("--cwd").arg(cwd);
176 }
177 for env_file in ¶ms.env_files {
178 supervisor_cmd.arg("--env-file").arg(env_file);
179 }
180 for env_var in ¶ms.env_vars {
181 supervisor_cmd.arg("--env").arg(env_var);
182 }
183 if !params.inherit_env {
184 supervisor_cmd.arg("--no-inherit-env");
185 }
186 if params.progress_every_ms > 0 {
187 supervisor_cmd
188 .arg("--progress-every")
189 .arg(params.progress_every_ms.to_string());
190 }
191 if let Some(ref nc) = params.notify_command {
192 supervisor_cmd.arg("--notify-command").arg(nc);
193 }
194 if let Some(ref nf) = params.notify_file {
195 supervisor_cmd.arg("--notify-file").arg(nf);
196 }
197 let wrapper_json =
198 serde_json::to_string(¶ms.shell_wrapper).context("serialize shell wrapper")?;
199 supervisor_cmd
200 .arg("--shell-wrapper-resolved")
201 .arg(&wrapper_json);
202
203 supervisor_cmd
204 .arg("--")
205 .args(¶ms.command)
206 .stdin(std::process::Stdio::null())
207 .stdout(std::process::Stdio::null())
208 .stderr(std::process::Stdio::null());
209
210 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
211 let supervisor_pid = supervisor.id();
212 debug!(supervisor_pid, "supervisor spawned");
213
214 job_dir.init_state(supervisor_pid, &started_at)?;
216
217 #[cfg(windows)]
219 {
220 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
221 loop {
222 std::thread::sleep(std::time::Duration::from_millis(10));
223 if let Ok(current_state) = job_dir.read_state() {
224 let supervisor_updated = current_state
225 .pid
226 .map(|p| p != supervisor_pid)
227 .unwrap_or(false)
228 || *current_state.status() == crate::schema::JobStatus::Failed;
229 if supervisor_updated {
230 if *current_state.status() == crate::schema::JobStatus::Failed {
231 anyhow::bail!(
232 "supervisor failed to assign child process to Job Object \
233 (Windows MUST requirement); see stderr for details"
234 );
235 }
236 debug!("supervisor confirmed Job Object assignment via state.json handshake");
237 break;
238 }
239 }
240 if std::time::Instant::now() >= handshake_deadline {
241 debug!("supervisor handshake timed out; proceeding with initial state");
242 break;
243 }
244 }
245 }
246
247 Ok((supervisor_pid, started_at))
248}
249
250pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
253 for log_path in [
254 job_dir.stdout_path(),
255 job_dir.stderr_path(),
256 job_dir.full_log_path(),
257 ] {
258 std::fs::OpenOptions::new()
259 .create(true)
260 .append(true)
261 .open(&log_path)
262 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
263 }
264 Ok(())
265}
266
267pub struct SnapshotWaitOpts {
269 pub snapshot_after: u64,
270 pub tail_lines: u64,
271 pub max_bytes: u64,
272 pub wait: bool,
273 pub wait_poll_ms: u64,
274}
275
276pub fn run_snapshot_wait(
279 job_dir: &JobDir,
280 opts: &SnapshotWaitOpts,
281) -> (
282 String,
283 Option<i32>,
284 Option<String>,
285 Option<Snapshot>,
286 Option<Snapshot>,
287 u64,
288) {
289 use crate::schema::JobStatus;
290
291 let effective_snapshot_after = if opts.wait {
292 0
293 } else {
294 opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
295 };
296
297 let wait_start = std::time::Instant::now();
298
299 let snapshot = if effective_snapshot_after > 0 {
300 debug!(ms = effective_snapshot_after, "polling for snapshot");
301 let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
302 let poll_interval = std::time::Duration::from_millis(15);
303 loop {
304 std::thread::sleep(poll_interval);
305 if let Ok(st) = job_dir.read_state()
306 && !st.status().is_non_terminal()
307 {
308 debug!("snapshot poll: job no longer running/created, exiting early");
309 break;
310 }
311 if std::time::Instant::now() >= deadline {
312 debug!("snapshot poll: deadline reached");
313 break;
314 }
315 }
316 Some(build_snapshot(job_dir, opts.tail_lines, opts.max_bytes))
317 } else {
318 None
319 };
320
321 let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
322 debug!("--wait: polling for terminal state");
323 let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
324 loop {
325 std::thread::sleep(poll);
326 if let Ok(st) = job_dir.read_state()
327 && !st.status().is_non_terminal()
328 {
329 let snap = build_snapshot(job_dir, opts.tail_lines, opts.max_bytes);
330 let ec = st.exit_code();
331 let fa = st.finished_at.clone();
332 let state_str = st.status().as_str().to_string();
333 break (state_str, ec, fa, Some(snap));
334 }
335 }
336 } else {
337 (JobStatus::Running.as_str().to_string(), None, None, None)
338 };
339
340 let waited_ms = wait_start.elapsed().as_millis() as u64;
341 (
342 final_state,
343 exit_code_opt,
344 finished_at_opt,
345 snapshot,
346 final_snapshot_opt,
347 waited_ms,
348 )
349}
350
351pub fn execute(opts: RunOpts) -> Result<()> {
353 if opts.command.is_empty() {
354 anyhow::bail!("no command specified for run");
355 }
356
357 let elapsed_start = std::time::Instant::now();
358
359 let root = resolve_root(opts.root);
360 std::fs::create_dir_all(&root)
361 .with_context(|| format!("create jobs root {}", root.display()))?;
362
363 let job_id = Ulid::new().to_string();
364 let created_at = now_rfc3339();
365
366 let env_keys: Vec<String> = opts
368 .env_vars
369 .iter()
370 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
371 .collect();
372
373 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
375
376 let effective_cwd = resolve_effective_cwd(opts.cwd);
380
381 let on_output_match = crate::notify::build_output_match_config(
383 opts.output_pattern,
384 opts.output_match_type,
385 opts.output_stream,
386 opts.output_command,
387 opts.output_file,
388 None,
389 );
390
391 let notification =
392 if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
393 {
394 Some(crate::schema::NotificationConfig {
395 notify_command: opts.notify_command.clone(),
396 notify_file: opts.notify_file.clone(),
397 on_output_match,
398 })
399 } else {
400 None
401 };
402
403 let tags = dedup_tags(opts.tags)?;
405
406 let meta = JobMeta {
407 job: JobMetaJob { id: job_id.clone() },
408 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
409 command: opts.command.clone(),
410 created_at: created_at.clone(),
411 root: root.display().to_string(),
412 env_keys,
413 env_vars: masked_env_vars.clone(),
414 env_vars_runtime: vec![],
417 mask: opts.mask.clone(),
418 cwd: Some(effective_cwd),
419 notification,
420 inherit_env: opts.inherit_env,
422 env_files: opts.env_files.clone(),
423 timeout_ms: opts.timeout_ms,
424 kill_after_ms: opts.kill_after_ms,
425 progress_every_ms: opts.progress_every_ms,
426 shell_wrapper: Some(opts.shell_wrapper.clone()),
427 tags: tags.clone(),
428 };
429
430 let job_dir = JobDir::create(&root, &job_id, &meta)?;
431 info!(job_id = %job_id, "created job directory");
432
433 let full_log_path = if let Some(log) = opts.log {
435 log.to_string()
436 } else {
437 job_dir.full_log_path().display().to_string()
438 };
439
440 pre_create_log_files(&job_dir)?;
442
443 let (_supervisor_pid, _started_at) = spawn_supervisor_process(
447 &job_dir,
448 SpawnSupervisorParams {
449 job_id: job_id.clone(),
450 root: root.clone(),
451 full_log_path: full_log_path.clone(),
452 timeout_ms: opts.timeout_ms,
453 kill_after_ms: opts.kill_after_ms,
454 cwd: opts.cwd.map(|s| s.to_string()),
455 env_vars: opts.env_vars.clone(),
456 env_files: opts.env_files.clone(),
457 inherit_env: opts.inherit_env,
458 progress_every_ms: opts.progress_every_ms,
459 notify_command: opts.notify_command.clone(),
460 notify_file: opts.notify_file.clone(),
461 shell_wrapper: opts.shell_wrapper.clone(),
462 command: opts.command.clone(),
463 },
464 )?;
465
466 let stdout_log_path = job_dir.stdout_path().display().to_string();
468 let stderr_log_path = job_dir.stderr_path().display().to_string();
469
470 let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
471 run_snapshot_wait(
472 &job_dir,
473 &SnapshotWaitOpts {
474 snapshot_after: opts.snapshot_after,
475 tail_lines: opts.tail_lines,
476 max_bytes: opts.max_bytes,
477 wait: opts.wait,
478 wait_poll_ms: opts.wait_poll_ms,
479 },
480 );
481
482 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
483
484 let response = Response::new(
485 "run",
486 RunData {
487 job_id,
488 state: final_state,
489 tags,
490 env_vars: masked_env_vars,
493 snapshot,
494 stdout_log_path,
495 stderr_log_path,
496 waited_ms,
497 elapsed_ms,
498 exit_code: exit_code_opt,
499 finished_at: finished_at_opt,
500 final_snapshot: final_snapshot_opt,
501 },
502 );
503 response.print();
504 Ok(())
505}
506
507fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
508 let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
509 let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
510 Snapshot {
511 truncated: stdout.truncated || stderr.truncated,
512 encoding: "utf-8-lossy".to_string(),
513 stdout_observed_bytes: stdout.observed_bytes,
514 stderr_observed_bytes: stderr.observed_bytes,
515 stdout_included_bytes: stdout.included_bytes,
516 stderr_included_bytes: stderr.included_bytes,
517 stdout_tail: stdout.tail,
518 stderr_tail: stderr.tail,
519 }
520}
521
522#[derive(Debug)]
528pub struct SuperviseOpts<'a> {
529 pub job_id: &'a str,
530 pub root: &'a Path,
531 pub command: &'a [String],
532 pub full_log: Option<&'a str>,
534 pub timeout_ms: u64,
536 pub kill_after_ms: u64,
538 pub cwd: Option<&'a str>,
540 pub env_vars: Vec<String>,
542 pub env_files: Vec<String>,
544 pub inherit_env: bool,
546 pub progress_every_ms: u64,
548 pub notify_command: Option<String>,
551 pub notify_file: Option<String>,
553 pub shell_wrapper: Vec<String>,
555}
556
557pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
564 let base = match cwd_override {
565 Some(p) => std::path::PathBuf::from(p),
566 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
567 };
568
569 match base.canonicalize() {
571 Ok(canonical) => canonical.display().to_string(),
572 Err(_) => {
573 if base.is_absolute() {
575 base.display().to_string()
576 } else {
577 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
579 cwd.join(base).display().to_string()
580 }
581 }
582 }
583}
584
585pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
588 if mask_keys.is_empty() {
589 return env_vars.to_vec();
590 }
591 env_vars
592 .iter()
593 .map(|s| {
594 let (key, _val) = parse_env_var(s);
595 if mask_keys.iter().any(|k| k == &key) {
596 format!("{key}=***")
597 } else {
598 s.clone()
599 }
600 })
601 .collect()
602}
603
604fn parse_env_var(s: &str) -> (String, String) {
606 if let Some(pos) = s.find('=') {
607 (s[..pos].to_string(), s[pos + 1..].to_string())
608 } else {
609 (s.to_string(), String::new())
610 }
611}
612
613fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
616 let contents =
617 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
618 let mut vars = Vec::new();
619 for line in contents.lines() {
620 let line = line.trim();
621 if line.is_empty() || line.starts_with('#') {
622 continue;
623 }
624 vars.push(parse_env_var(line));
625 }
626 Ok(vars)
627}
628
629struct OutputMatchChecker {
636 job_dir_path: std::path::PathBuf,
637 shell_wrapper: Vec<String>,
638 inner: std::sync::Mutex<OutputMatchInner>,
639}
640
641struct OutputMatchInner {
642 config: Option<crate::schema::NotificationConfig>,
643}
644
645impl OutputMatchChecker {
646 fn new(
647 job_dir_path: std::path::PathBuf,
648 shell_wrapper: Vec<String>,
649 initial_config: Option<crate::schema::NotificationConfig>,
650 ) -> Self {
651 Self {
652 job_dir_path,
653 shell_wrapper,
654 inner: std::sync::Mutex::new(OutputMatchInner {
655 config: initial_config,
656 }),
657 }
658 }
659
660 fn check_line(&self, line: &str, stream: &str) {
667 use crate::schema::{OutputMatchStream, OutputMatchType};
668
669 let match_info: Option<crate::schema::OutputMatchConfig> = {
671 let mut inner = self.inner.lock().unwrap();
672
673 {
675 let meta_path = self.job_dir_path.join("meta.json");
676 if let Ok(raw) = std::fs::read(&meta_path)
677 && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
678 {
679 inner.config = meta.notification;
680 }
681 }
682
683 let Some(ref notification) = inner.config else {
684 return;
685 };
686 let Some(ref match_cfg) = notification.on_output_match else {
687 return;
688 };
689
690 let stream_matches = match match_cfg.stream {
692 OutputMatchStream::Stdout => stream == "stdout",
693 OutputMatchStream::Stderr => stream == "stderr",
694 OutputMatchStream::Either => true,
695 };
696 if !stream_matches {
697 return;
698 }
699
700 let matched = match &match_cfg.match_type {
702 OutputMatchType::Contains => line.contains(&match_cfg.pattern),
703 OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
704 .map(|re| re.is_match(line))
705 .unwrap_or(false),
706 };
707
708 if matched {
709 Some(match_cfg.clone())
710 } else {
711 None
712 }
713 }; if let Some(match_cfg) = match_info {
716 self.dispatch_match(line, stream, &match_cfg);
717 }
718 }
719
720 fn dispatch_match(
723 &self,
724 line: &str,
725 stream: &str,
726 match_cfg: &crate::schema::OutputMatchConfig,
727 ) {
728 use std::io::Write;
729
730 let job_id = self
731 .job_dir_path
732 .file_name()
733 .and_then(|n| n.to_str())
734 .unwrap_or("unknown");
735
736 let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
737 let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
738 let events_path = self.job_dir_path.join("notification_events.ndjson");
739 let events_path_str = events_path.display().to_string();
740
741 let match_type_str = match &match_cfg.match_type {
742 crate::schema::OutputMatchType::Contains => "contains",
743 crate::schema::OutputMatchType::Regex => "regex",
744 };
745
746 let event = crate::schema::OutputMatchEvent {
747 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
748 event_type: "job.output.matched".to_string(),
749 job_id: job_id.to_string(),
750 pattern: match_cfg.pattern.clone(),
751 match_type: match_type_str.to_string(),
752 stream: stream.to_string(),
753 line: line.to_string(),
754 stdout_log_path,
755 stderr_log_path,
756 };
757
758 let event_json = serde_json::to_string(&event).unwrap_or_default();
759 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
760
761 if let Some(ref cmd) = match_cfg.command {
762 delivery_results.push(dispatch_command_sink(
763 cmd,
764 &event_json,
765 job_id,
766 &events_path_str,
767 &self.shell_wrapper,
768 "job.output.matched",
769 ));
770 }
771 if let Some(ref file_path) = match_cfg.file {
772 delivery_results.push(dispatch_file_sink(file_path, &event_json));
773 }
774
775 let record = crate::schema::OutputMatchEventRecord {
777 event,
778 delivery_results,
779 };
780 if let Ok(record_json) = serde_json::to_string(&record)
781 && let Ok(mut f) = std::fs::OpenOptions::new()
782 .create(true)
783 .append(true)
784 .open(&events_path)
785 {
786 let _ = writeln!(f, "{record_json}");
787 }
788 }
789}
790
791fn stream_to_logs<R, F>(
808 stream: R,
809 log_path: &std::path::Path,
810 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
811 label: &str,
812 on_line: Option<F>,
813) where
814 R: std::io::Read,
815 F: Fn(&str),
816{
817 use std::io::Write;
818 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
819 let mut stream = stream;
820 let mut buf = [0u8; 8192];
821 let mut line_buf: Vec<u8> = Vec::new();
823 loop {
824 match stream.read(&mut buf) {
825 Ok(0) => break, Ok(n) => {
827 let chunk = &buf[..n];
828 let _ = log_file.write_all(chunk);
830 for &b in chunk {
832 if b == b'\n' {
833 let line = String::from_utf8_lossy(&line_buf);
834 if let Ok(mut fl) = full_log.lock() {
835 let ts = now_rfc3339();
836 let _ = writeln!(fl, "{ts} [{label}] {line}");
837 }
838 if let Some(ref f) = on_line {
839 f(&line);
840 }
841 line_buf.clear();
842 } else {
843 line_buf.push(b);
844 }
845 }
846 }
847 Err(_) => break,
848 }
849 }
850 if !line_buf.is_empty() {
852 let line = String::from_utf8_lossy(&line_buf);
853 if let Ok(mut fl) = full_log.lock() {
854 let ts = now_rfc3339();
855 let _ = writeln!(fl, "{ts} [{label}] {line}");
856 }
857 if let Some(ref f) = on_line {
858 f(&line);
859 }
860 }
861}
862
863pub fn supervise(opts: SuperviseOpts) -> Result<()> {
873 use std::sync::{Arc, Mutex};
874
875 let job_id = opts.job_id;
876 let root = opts.root;
877 let command = opts.command;
878
879 if command.is_empty() {
880 anyhow::bail!("supervisor: no command");
881 }
882
883 let job_dir = JobDir::open(root, job_id)?;
884
885 let meta = job_dir.read_meta()?;
887 let started_at = now_rfc3339();
891
892 let full_log_path = if let Some(p) = opts.full_log {
894 std::path::PathBuf::from(p)
895 } else {
896 job_dir.full_log_path()
897 };
898
899 if let Some(parent) = full_log_path.parent() {
902 std::fs::create_dir_all(parent)
903 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
904 }
905 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
906 let full_log = Arc::new(Mutex::new(full_log_file));
907
908 if opts.shell_wrapper.is_empty() {
924 anyhow::bail!("supervisor: shell wrapper must not be empty");
925 }
926 let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
927 if command.len() == 1 {
928 child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
930 } else {
931 #[cfg(unix)]
941 {
942 child_cmd
945 .args(&opts.shell_wrapper[1..])
946 .arg("exec \"$@\"")
947 .arg("--")
948 .args(command);
949 }
950 #[cfg(not(unix))]
951 {
952 let joined = command
956 .iter()
957 .map(|a| {
958 if a.contains(' ') {
959 format!("\"{}\"", a)
960 } else {
961 a.clone()
962 }
963 })
964 .collect::<Vec<_>>()
965 .join(" ");
966 child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
967 }
968 }
969
970 if opts.inherit_env {
971 } else {
973 child_cmd.env_clear();
974 }
975
976 for env_file in &opts.env_files {
978 let vars = load_env_file(env_file)?;
979 for (k, v) in vars {
980 child_cmd.env(&k, &v);
981 }
982 }
983
984 for env_var in &opts.env_vars {
986 let (k, v) = parse_env_var(env_var);
987 child_cmd.env(&k, &v);
988 }
989
990 if let Some(cwd) = opts.cwd {
992 child_cmd.current_dir(cwd);
993 }
994
995 #[cfg(unix)]
1000 {
1001 use std::os::unix::process::CommandExt;
1002 unsafe {
1004 child_cmd.pre_exec(|| {
1005 libc::setsid();
1006 Ok(())
1007 });
1008 }
1009 }
1010
1011 let mut child = child_cmd
1013 .stdin(std::process::Stdio::null())
1014 .stdout(std::process::Stdio::piped())
1015 .stderr(std::process::Stdio::piped())
1016 .spawn()
1017 .context("supervisor: spawn child")?;
1018
1019 let pid = child.id();
1020 info!(job_id, pid, "child process started");
1021
1022 #[cfg(windows)]
1029 let windows_job_name = {
1030 match assign_to_job_object(job_id, pid) {
1031 Ok(name) => Some(name),
1032 Err(e) => {
1033 let kill_err = child.kill();
1037 let _ = child.wait(); let failed_state = JobState {
1040 job: JobStateJob {
1041 id: job_id.to_string(),
1042 status: JobStatus::Failed,
1043 started_at: Some(started_at.clone()),
1044 },
1045 result: JobStateResult {
1046 exit_code: None,
1047 signal: None,
1048 duration_ms: None,
1049 },
1050 pid: Some(pid),
1051 finished_at: Some(now_rfc3339()),
1052 updated_at: now_rfc3339(),
1053 windows_job_name: None,
1054 };
1055 let _ = job_dir.write_state(&failed_state);
1058
1059 if opts.notify_command.is_some() || opts.notify_file.is_some() {
1063 let finished_at_ts =
1064 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1065 let stdout_log = job_dir.stdout_path().display().to_string();
1066 let stderr_log = job_dir.stderr_path().display().to_string();
1067 let fail_event = crate::schema::CompletionEvent {
1068 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1069 event_type: "job.finished".to_string(),
1070 job_id: job_id.to_string(),
1071 state: JobStatus::Failed.as_str().to_string(),
1072 command: meta.command.clone(),
1073 cwd: meta.cwd.clone(),
1074 started_at: started_at.clone(),
1075 finished_at: finished_at_ts,
1076 duration_ms: None,
1077 exit_code: None,
1078 signal: None,
1079 stdout_log_path: stdout_log,
1080 stderr_log_path: stderr_log,
1081 };
1082 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1083 let fail_event_path = job_dir.completion_event_path().display().to_string();
1084 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1085 Vec::new();
1086 if let Err(we) = job_dir.write_completion_event_atomic(
1087 &crate::schema::CompletionEventRecord {
1088 event: fail_event.clone(),
1089 delivery_results: vec![],
1090 },
1091 ) {
1092 warn!(
1093 job_id,
1094 error = %we,
1095 "failed to write initial completion_event.json for failed job"
1096 );
1097 }
1098 if let Some(ref shell_cmd) = opts.notify_command {
1099 fail_delivery_results.push(dispatch_command_sink(
1100 shell_cmd,
1101 &fail_event_json,
1102 job_id,
1103 &fail_event_path,
1104 &opts.shell_wrapper,
1105 "job.finished",
1106 ));
1107 }
1108 if let Some(ref file_path) = opts.notify_file {
1109 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1110 }
1111 if let Err(we) = job_dir.write_completion_event_atomic(
1112 &crate::schema::CompletionEventRecord {
1113 event: fail_event,
1114 delivery_results: fail_delivery_results,
1115 },
1116 ) {
1117 warn!(
1118 job_id,
1119 error = %we,
1120 "failed to update completion_event.json with delivery results for failed job"
1121 );
1122 }
1123 }
1124
1125 if let Err(ke) = kill_err {
1126 return Err(anyhow::anyhow!(
1127 "supervisor: failed to assign pid {pid} to Job Object \
1128 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1129 ));
1130 }
1131 return Err(anyhow::anyhow!(
1132 "supervisor: failed to assign pid {pid} to Job Object \
1133 (Windows MUST requirement); child process was killed; \
1134 consider running outside a nested Job Object environment: {e}"
1135 ));
1136 }
1137 }
1138 };
1139 #[cfg(not(windows))]
1140 let windows_job_name: Option<String> = None;
1141
1142 let state = JobState {
1147 job: JobStateJob {
1148 id: job_id.to_string(),
1149 status: JobStatus::Running,
1150 started_at: Some(started_at.clone()),
1151 },
1152 result: JobStateResult {
1153 exit_code: None,
1154 signal: None,
1155 duration_ms: None,
1156 },
1157 pid: Some(pid),
1158 finished_at: None,
1159 updated_at: now_rfc3339(),
1160 windows_job_name,
1161 };
1162 job_dir.write_state(&state)?;
1163
1164 let child_start_time = std::time::Instant::now();
1165
1166 let child_stdout = child.stdout.take().expect("child stdout piped");
1168 let child_stderr = child.stderr.take().expect("child stderr piped");
1169
1170 let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1172 job_dir.path.clone(),
1173 opts.shell_wrapper.clone(),
1174 meta.notification.clone(),
1175 ));
1176
1177 let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1181 let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1182
1183 let stdout_log_path = job_dir.stdout_path();
1185 let full_log_stdout = Arc::clone(&full_log);
1186 let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1187 let t_stdout = std::thread::spawn(move || {
1188 stream_to_logs(
1189 child_stdout,
1190 &stdout_log_path,
1191 full_log_stdout,
1192 "STDOUT",
1193 Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1194 );
1195 let _ = tx_stdout_done.send(());
1196 });
1197
1198 let stderr_log_path = job_dir.stderr_path();
1200 let full_log_stderr = Arc::clone(&full_log);
1201 let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1202 let t_stderr = std::thread::spawn(move || {
1203 stream_to_logs(
1204 child_stderr,
1205 &stderr_log_path,
1206 full_log_stderr,
1207 "STDERR",
1208 Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1209 );
1210 let _ = tx_stderr_done.send(());
1211 });
1212
1213 let timeout_ms = opts.timeout_ms;
1216 let kill_after_ms = opts.kill_after_ms;
1217 let progress_every_ms = opts.progress_every_ms;
1218 let state_path = job_dir.state_path();
1219 let job_id_str = job_id.to_string();
1220
1221 use std::sync::atomic::{AtomicBool, Ordering};
1223 let child_done = Arc::new(AtomicBool::new(false));
1224
1225 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1226 let state_path_clone = state_path.clone();
1227 let child_done_clone = Arc::clone(&child_done);
1228 Some(std::thread::spawn(move || {
1229 let start = std::time::Instant::now();
1230 let timeout_dur = if timeout_ms > 0 {
1231 Some(std::time::Duration::from_millis(timeout_ms))
1232 } else {
1233 None
1234 };
1235 let progress_dur = if progress_every_ms > 0 {
1236 Some(std::time::Duration::from_millis(progress_every_ms))
1237 } else {
1238 None
1239 };
1240
1241 let poll_interval = std::time::Duration::from_millis(100);
1242
1243 loop {
1244 std::thread::sleep(poll_interval);
1245
1246 if child_done_clone.load(Ordering::Relaxed) {
1248 break;
1249 }
1250
1251 let elapsed = start.elapsed();
1252
1253 if let Some(td) = timeout_dur
1255 && elapsed >= td
1256 {
1257 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1258 #[cfg(unix)]
1261 {
1262 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1263 }
1264 if kill_after_ms > 0 {
1266 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1267 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1268 #[cfg(unix)]
1269 {
1270 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1271 }
1272 } else {
1273 #[cfg(unix)]
1275 {
1276 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1277 }
1278 }
1279 break;
1280 }
1281
1282 if let Some(pd) = progress_dur {
1284 let elapsed_ms = elapsed.as_millis() as u64;
1285 let pd_ms = pd.as_millis() as u64;
1286 let poll_ms = poll_interval.as_millis() as u64;
1287 if elapsed_ms % pd_ms < poll_ms {
1288 if let Ok(raw) = std::fs::read(&state_path_clone)
1290 && let Ok(mut st) =
1291 serde_json::from_slice::<crate::schema::JobState>(&raw)
1292 {
1293 st.updated_at = now_rfc3339();
1294 if let Ok(s) = serde_json::to_string_pretty(&st) {
1295 let _ = std::fs::write(&state_path_clone, s);
1296 }
1297 }
1298 }
1299 }
1300 }
1301 }))
1302 } else {
1303 None
1304 };
1305
1306 let exit_status = child.wait().context("wait for child")?;
1308
1309 child_done.store(true, Ordering::Relaxed);
1311
1312 let duration_ms = child_start_time.elapsed().as_millis() as u64;
1319 let exit_code = exit_status.code();
1320 let finished_at = now_rfc3339();
1321
1322 #[cfg(unix)]
1324 let (terminal_status, signal_name) = {
1325 use std::os::unix::process::ExitStatusExt;
1326 if let Some(sig) = exit_status.signal() {
1327 (JobStatus::Killed, Some(sig.to_string()))
1328 } else {
1329 (JobStatus::Exited, None)
1330 }
1331 };
1332 #[cfg(not(unix))]
1333 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1334
1335 let state = JobState {
1336 job: JobStateJob {
1337 id: job_id.to_string(),
1338 status: terminal_status.clone(),
1339 started_at: Some(started_at.clone()),
1340 },
1341 result: JobStateResult {
1342 exit_code,
1343 signal: signal_name.clone(),
1344 duration_ms: Some(duration_ms),
1345 },
1346 pid: Some(pid),
1347 finished_at: Some(finished_at.clone()),
1348 updated_at: now_rfc3339(),
1349 windows_job_name: None, };
1351 job_dir.write_state(&state)?;
1352 info!(job_id, ?exit_code, "child process finished");
1353
1354 const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1365 let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1366
1367 let remaining = drain_deadline
1368 .checked_duration_since(std::time::Instant::now())
1369 .unwrap_or(std::time::Duration::ZERO);
1370 if rx_stdout_done.recv_timeout(remaining).is_ok() {
1371 let _ = t_stdout.join();
1372 } else {
1373 drop(t_stdout); }
1375
1376 let remaining = drain_deadline
1377 .checked_duration_since(std::time::Instant::now())
1378 .unwrap_or(std::time::Duration::ZERO);
1379 if rx_stderr_done.recv_timeout(remaining).is_ok() {
1380 let _ = t_stderr.join();
1381 } else {
1382 drop(t_stderr); }
1384
1385 if let Some(w) = watcher {
1387 let _ = w.join();
1388 }
1389
1390 let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1393 let (current_notify_command, current_notify_file) = match &latest_notification {
1394 Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1395 None => (None, None),
1396 };
1397
1398 let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1401 if has_notification {
1402 let stdout_log = job_dir.stdout_path().display().to_string();
1403 let stderr_log = job_dir.stderr_path().display().to_string();
1404 let event = crate::schema::CompletionEvent {
1405 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1406 event_type: "job.finished".to_string(),
1407 job_id: job_id.to_string(),
1408 state: terminal_status.as_str().to_string(),
1409 command: meta.command.clone(),
1410 cwd: meta.cwd.clone(),
1411 started_at,
1412 finished_at,
1413 duration_ms: Some(duration_ms),
1414 exit_code,
1415 signal: signal_name,
1416 stdout_log_path: stdout_log,
1417 stderr_log_path: stderr_log,
1418 };
1419
1420 let event_json = serde_json::to_string(&event).unwrap_or_default();
1421 let event_path = job_dir.completion_event_path().display().to_string();
1422 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1423
1424 if let Err(e) =
1426 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1427 event: event.clone(),
1428 delivery_results: vec![],
1429 })
1430 {
1431 warn!(job_id, error = %e, "failed to write initial completion_event.json");
1432 }
1433
1434 if let Some(ref shell_cmd) = current_notify_command {
1435 delivery_results.push(dispatch_command_sink(
1436 shell_cmd,
1437 &event_json,
1438 job_id,
1439 &event_path,
1440 &opts.shell_wrapper,
1441 "job.finished",
1442 ));
1443 }
1444 if let Some(ref file_path) = current_notify_file {
1445 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1446 }
1447
1448 if let Err(e) =
1450 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1451 event,
1452 delivery_results,
1453 })
1454 {
1455 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1456 }
1457 }
1458
1459 Ok(())
1460}
1461
1462fn dispatch_command_sink(
1469 shell_cmd: &str,
1470 event_json: &str,
1471 job_id: &str,
1472 event_path: &str,
1473 shell_wrapper: &[String],
1474 event_type: &str,
1475) -> crate::schema::SinkDeliveryResult {
1476 use std::io::Write;
1477 let attempted_at = now_rfc3339();
1478 let target = shell_cmd.to_string();
1479
1480 if shell_cmd.trim().is_empty() {
1481 return crate::schema::SinkDeliveryResult {
1482 sink_type: "command".to_string(),
1483 target,
1484 success: false,
1485 error: Some("empty shell command".to_string()),
1486 attempted_at,
1487 };
1488 }
1489
1490 if shell_wrapper.is_empty() {
1491 return crate::schema::SinkDeliveryResult {
1492 sink_type: "command".to_string(),
1493 target,
1494 success: false,
1495 error: Some("shell wrapper must not be empty".to_string()),
1496 attempted_at,
1497 };
1498 }
1499
1500 let mut cmd = Command::new(&shell_wrapper[0]);
1501 cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1502
1503 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1504 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1505 cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1506 cmd.stdin(std::process::Stdio::piped());
1507 cmd.stdout(std::process::Stdio::null());
1508 cmd.stderr(std::process::Stdio::null());
1509
1510 match cmd.spawn() {
1511 Ok(mut child) => {
1512 if let Some(mut stdin) = child.stdin.take() {
1513 let _ = stdin.write_all(event_json.as_bytes());
1514 }
1515 match child.wait() {
1516 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1517 sink_type: "command".to_string(),
1518 target,
1519 success: true,
1520 error: None,
1521 attempted_at,
1522 },
1523 Ok(status) => crate::schema::SinkDeliveryResult {
1524 sink_type: "command".to_string(),
1525 target,
1526 success: false,
1527 error: Some(format!("exited with status {status}")),
1528 attempted_at,
1529 },
1530 Err(e) => crate::schema::SinkDeliveryResult {
1531 sink_type: "command".to_string(),
1532 target,
1533 success: false,
1534 error: Some(format!("wait error: {e}")),
1535 attempted_at,
1536 },
1537 }
1538 }
1539 Err(e) => crate::schema::SinkDeliveryResult {
1540 sink_type: "command".to_string(),
1541 target,
1542 success: false,
1543 error: Some(format!("spawn error: {e}")),
1544 attempted_at,
1545 },
1546 }
1547}
1548
1549fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1552 use std::io::Write;
1553 let attempted_at = now_rfc3339();
1554 let path = std::path::Path::new(file_path);
1555
1556 if let Some(parent) = path.parent()
1557 && let Err(e) = std::fs::create_dir_all(parent)
1558 {
1559 return crate::schema::SinkDeliveryResult {
1560 sink_type: "file".to_string(),
1561 target: file_path.to_string(),
1562 success: false,
1563 error: Some(format!("create parent dir: {e}")),
1564 attempted_at,
1565 };
1566 }
1567
1568 match std::fs::OpenOptions::new()
1569 .create(true)
1570 .append(true)
1571 .open(path)
1572 {
1573 Ok(mut f) => match writeln!(f, "{event_json}") {
1574 Ok(_) => crate::schema::SinkDeliveryResult {
1575 sink_type: "file".to_string(),
1576 target: file_path.to_string(),
1577 success: true,
1578 error: None,
1579 attempted_at,
1580 },
1581 Err(e) => crate::schema::SinkDeliveryResult {
1582 sink_type: "file".to_string(),
1583 target: file_path.to_string(),
1584 success: false,
1585 error: Some(format!("write error: {e}")),
1586 attempted_at,
1587 },
1588 },
1589 Err(e) => crate::schema::SinkDeliveryResult {
1590 sink_type: "file".to_string(),
1591 target: file_path.to_string(),
1592 success: false,
1593 error: Some(format!("open error: {e}")),
1594 attempted_at,
1595 },
1596 }
1597}
1598
1599pub fn now_rfc3339_pub() -> String {
1601 now_rfc3339()
1602}
1603
1604fn now_rfc3339() -> String {
1605 let d = std::time::SystemTime::now()
1607 .duration_since(std::time::UNIX_EPOCH)
1608 .unwrap_or_default();
1609 format_rfc3339(d.as_secs())
1610}
1611
1612fn format_rfc3339(secs: u64) -> String {
1613 let mut s = secs;
1615 let seconds = s % 60;
1616 s /= 60;
1617 let minutes = s % 60;
1618 s /= 60;
1619 let hours = s % 24;
1620 s /= 24;
1621
1622 let mut days = s;
1624 let mut year = 1970u64;
1625 loop {
1626 let days_in_year = if is_leap(year) { 366 } else { 365 };
1627 if days < days_in_year {
1628 break;
1629 }
1630 days -= days_in_year;
1631 year += 1;
1632 }
1633
1634 let leap = is_leap(year);
1635 let month_days: [u64; 12] = [
1636 31,
1637 if leap { 29 } else { 28 },
1638 31,
1639 30,
1640 31,
1641 30,
1642 31,
1643 31,
1644 30,
1645 31,
1646 30,
1647 31,
1648 ];
1649 let mut month = 0usize;
1650 for (i, &d) in month_days.iter().enumerate() {
1651 if days < d {
1652 month = i;
1653 break;
1654 }
1655 days -= d;
1656 }
1657 let day = days + 1;
1658
1659 format!(
1660 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1661 year,
1662 month + 1,
1663 day,
1664 hours,
1665 minutes,
1666 seconds
1667 )
1668}
1669
1670fn is_leap(year: u64) -> bool {
1671 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1672}
1673
1674#[cfg(windows)]
1685fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1686 use windows::Win32::Foundation::CloseHandle;
1687 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1688 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1689 use windows::core::HSTRING;
1690
1691 let job_name = format!("AgentExec-{job_id}");
1692 let hname = HSTRING::from(job_name.as_str());
1693
1694 unsafe {
1695 let proc_handle =
1697 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1698 anyhow::anyhow!(
1699 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1700 )
1701 })?;
1702
1703 let job = match CreateJobObjectW(None, &hname) {
1705 Ok(h) => h,
1706 Err(e) => {
1707 let _ = CloseHandle(proc_handle);
1708 return Err(anyhow::anyhow!(
1709 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1710 ));
1711 }
1712 };
1713
1714 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1718 let _ = CloseHandle(job);
1719 let _ = CloseHandle(proc_handle);
1720 return Err(anyhow::anyhow!(
1721 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1722 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1723 ));
1724 }
1725
1726 let _ = CloseHandle(proc_handle);
1731 std::mem::forget(job);
1734 }
1735
1736 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1737 Ok(job_name)
1738}
1739
1740#[cfg(test)]
1741mod tests {
1742 use super::*;
1743
1744 #[test]
1745 fn rfc3339_epoch() {
1746 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1747 }
1748
1749 #[test]
1750 fn rfc3339_known_date() {
1751 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1753 }
1754}