1use anyhow::{Context, Result};
10use std::path::Path;
11use std::process::Command;
12use tracing::{debug, info, warn};
13use ulid::Ulid;
14
15use crate::jobstore::{JobDir, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18 Snapshot,
19};
20use crate::tag::dedup_tags;
21
22#[derive(Debug)]
31pub struct RunOpts<'a> {
32 pub command: Vec<String>,
34 pub root: Option<&'a str>,
36 pub snapshot_after: u64,
38 pub tail_lines: u64,
40 pub max_bytes: u64,
42 pub timeout_ms: u64,
44 pub kill_after_ms: u64,
46 pub cwd: Option<&'a str>,
48 pub env_vars: Vec<String>,
50 pub env_files: Vec<String>,
52 pub inherit_env: bool,
54 pub mask: Vec<String>,
56 pub tags: Vec<String>,
58 pub log: Option<&'a str>,
60 pub progress_every_ms: u64,
62 pub wait: bool,
65 pub wait_poll_ms: u64,
67 pub wait_until_ms: u64,
70 pub wait_forever: bool,
72 pub notify_command: Option<String>,
75 pub notify_file: Option<String>,
77 pub output_pattern: Option<String>,
79 pub output_match_type: Option<String>,
81 pub output_stream: Option<String>,
83 pub output_command: Option<String>,
85 pub output_file: Option<String>,
87 pub shell_wrapper: Vec<String>,
90}
91
92impl<'a> Default for RunOpts<'a> {
93 fn default() -> Self {
94 RunOpts {
95 command: vec![],
96 root: None,
97 snapshot_after: 10_000,
98 tail_lines: 50,
99 max_bytes: 65536,
100 timeout_ms: 0,
101 kill_after_ms: 0,
102 cwd: None,
103 env_vars: vec![],
104 env_files: vec![],
105 inherit_env: true,
106 mask: vec![],
107 tags: vec![],
108 log: None,
109 progress_every_ms: 0,
110 wait: false,
111 wait_poll_ms: 200,
112 wait_until_ms: 30_000,
113 wait_forever: false,
114 notify_command: None,
115 notify_file: None,
116 output_pattern: None,
117 output_match_type: None,
118 output_stream: None,
119 output_command: None,
120 output_file: None,
121 shell_wrapper: crate::config::default_shell_wrapper(),
122 }
123 }
124}
125
126const MAX_SNAPSHOT_AFTER_MS: u64 = 10_000;
128
129pub struct SpawnSupervisorParams {
133 pub job_id: String,
134 pub root: std::path::PathBuf,
135 pub full_log_path: String,
136 pub timeout_ms: u64,
137 pub kill_after_ms: u64,
138 pub cwd: Option<String>,
139 pub env_vars: Vec<String>,
141 pub env_files: Vec<String>,
142 pub inherit_env: bool,
143 pub progress_every_ms: u64,
144 pub notify_command: Option<String>,
145 pub notify_file: Option<String>,
146 pub shell_wrapper: Vec<String>,
147 pub command: Vec<String>,
148}
149
150pub fn spawn_supervisor_process(
155 job_dir: &JobDir,
156 params: SpawnSupervisorParams,
157) -> Result<(u32, String)> {
158 let started_at = now_rfc3339();
159
160 let exe = std::env::current_exe().context("resolve current exe")?;
161 let mut supervisor_cmd = Command::new(&exe);
162 supervisor_cmd
163 .arg("_supervise")
164 .arg("--job-id")
165 .arg(¶ms.job_id)
166 .arg("--supervise-root")
167 .arg(params.root.display().to_string())
168 .arg("--full-log")
169 .arg(¶ms.full_log_path);
170
171 if params.timeout_ms > 0 {
172 supervisor_cmd
173 .arg("--timeout")
174 .arg(params.timeout_ms.to_string());
175 }
176 if params.kill_after_ms > 0 {
177 supervisor_cmd
178 .arg("--kill-after")
179 .arg(params.kill_after_ms.to_string());
180 }
181 if let Some(ref cwd) = params.cwd {
182 supervisor_cmd.arg("--cwd").arg(cwd);
183 }
184 for env_file in ¶ms.env_files {
185 supervisor_cmd.arg("--env-file").arg(env_file);
186 }
187 for env_var in ¶ms.env_vars {
188 supervisor_cmd.arg("--env").arg(env_var);
189 }
190 if !params.inherit_env {
191 supervisor_cmd.arg("--no-inherit-env");
192 }
193 if params.progress_every_ms > 0 {
194 supervisor_cmd
195 .arg("--progress-every")
196 .arg(params.progress_every_ms.to_string());
197 }
198 if let Some(ref nc) = params.notify_command {
199 supervisor_cmd.arg("--notify-command").arg(nc);
200 }
201 if let Some(ref nf) = params.notify_file {
202 supervisor_cmd.arg("--notify-file").arg(nf);
203 }
204 let wrapper_json =
205 serde_json::to_string(¶ms.shell_wrapper).context("serialize shell wrapper")?;
206 supervisor_cmd
207 .arg("--shell-wrapper-resolved")
208 .arg(&wrapper_json);
209
210 supervisor_cmd
211 .arg("--")
212 .args(¶ms.command)
213 .stdin(std::process::Stdio::null())
214 .stdout(std::process::Stdio::null())
215 .stderr(std::process::Stdio::null());
216
217 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
218 let supervisor_pid = supervisor.id();
219 debug!(supervisor_pid, "supervisor spawned");
220
221 job_dir.init_state(supervisor_pid, &started_at)?;
223
224 #[cfg(windows)]
226 {
227 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
228 loop {
229 std::thread::sleep(std::time::Duration::from_millis(10));
230 if let Ok(current_state) = job_dir.read_state() {
231 let supervisor_updated = current_state
232 .pid
233 .map(|p| p != supervisor_pid)
234 .unwrap_or(false)
235 || *current_state.status() == crate::schema::JobStatus::Failed;
236 if supervisor_updated {
237 if *current_state.status() == crate::schema::JobStatus::Failed {
238 anyhow::bail!(
239 "supervisor failed to assign child process to Job Object \
240 (Windows MUST requirement); see stderr for details"
241 );
242 }
243 debug!("supervisor confirmed Job Object assignment via state.json handshake");
244 break;
245 }
246 }
247 if std::time::Instant::now() >= handshake_deadline {
248 debug!("supervisor handshake timed out; proceeding with initial state");
249 break;
250 }
251 }
252 }
253
254 Ok((supervisor_pid, started_at))
255}
256
257pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
260 for log_path in [
261 job_dir.stdout_path(),
262 job_dir.stderr_path(),
263 job_dir.full_log_path(),
264 ] {
265 std::fs::OpenOptions::new()
266 .create(true)
267 .append(true)
268 .open(&log_path)
269 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
270 }
271 Ok(())
272}
273
274pub struct SnapshotWaitOpts {
276 pub snapshot_after: u64,
277 pub tail_lines: u64,
278 pub max_bytes: u64,
279 pub wait: bool,
280 pub wait_poll_ms: u64,
281 pub wait_until_ms: u64,
282 pub wait_forever: bool,
283}
284
285pub fn run_snapshot_wait(
288 job_dir: &JobDir,
289 opts: &SnapshotWaitOpts,
290) -> (
291 String,
292 Option<i32>,
293 Option<String>,
294 Option<Snapshot>,
295 Option<Snapshot>,
296 u64,
297) {
298 use crate::schema::JobStatus;
299
300 let effective_snapshot_after = if opts.wait {
301 0
302 } else {
303 opts.snapshot_after.min(MAX_SNAPSHOT_AFTER_MS)
304 };
305
306 let wait_start = std::time::Instant::now();
307
308 let snapshot = if effective_snapshot_after > 0 {
309 debug!(ms = effective_snapshot_after, "polling for snapshot");
310 let deadline = wait_start + std::time::Duration::from_millis(effective_snapshot_after);
311 let poll_interval = std::time::Duration::from_millis(15);
312 loop {
313 std::thread::sleep(poll_interval);
314 if let Ok(st) = job_dir.read_state()
315 && !st.status().is_non_terminal()
316 {
317 debug!("snapshot poll: job no longer running/created, exiting early");
318 break;
319 }
320 if std::time::Instant::now() >= deadline {
321 debug!("snapshot poll: deadline reached");
322 break;
323 }
324 }
325 Some(build_snapshot(job_dir, opts.tail_lines, opts.max_bytes))
326 } else {
327 None
328 };
329
330 let (final_state, exit_code_opt, finished_at_opt, final_snapshot_opt) = if opts.wait {
331 debug!(
332 wait_until_ms = opts.wait_until_ms,
333 wait_forever = opts.wait_forever,
334 "--wait: polling for terminal or deadline"
335 );
336 let poll = std::time::Duration::from_millis(opts.wait_poll_ms.max(1));
337 let wait_deadline = if opts.wait_forever {
338 None
339 } else {
340 Some(wait_start + std::time::Duration::from_millis(opts.wait_until_ms))
341 };
342
343 loop {
344 std::thread::sleep(poll);
345 if let Ok(st) = job_dir.read_state() {
346 if !st.status().is_non_terminal() {
347 let snap = build_snapshot(job_dir, opts.tail_lines, opts.max_bytes);
348 let ec = st.exit_code();
349 let fa = st.finished_at.clone();
350 let state_str = st.status().as_str().to_string();
351 break (state_str, ec, fa, Some(snap));
352 }
353
354 if let Some(deadline) = wait_deadline
355 && std::time::Instant::now() >= deadline
356 {
357 break (st.status().as_str().to_string(), None, None, None);
358 }
359 }
360 }
361 } else {
362 (JobStatus::Running.as_str().to_string(), None, None, None)
363 };
364
365 let waited_ms = wait_start.elapsed().as_millis() as u64;
366 (
367 final_state,
368 exit_code_opt,
369 finished_at_opt,
370 snapshot,
371 final_snapshot_opt,
372 waited_ms,
373 )
374}
375
376pub fn execute(opts: RunOpts) -> Result<()> {
378 if opts.command.is_empty() {
379 anyhow::bail!("no command specified for run");
380 }
381
382 let elapsed_start = std::time::Instant::now();
383
384 let root = resolve_root(opts.root);
385 std::fs::create_dir_all(&root)
386 .with_context(|| format!("create jobs root {}", root.display()))?;
387
388 let job_id = Ulid::new().to_string();
389 let created_at = now_rfc3339();
390
391 let env_keys: Vec<String> = opts
393 .env_vars
394 .iter()
395 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
396 .collect();
397
398 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
400
401 let effective_cwd = resolve_effective_cwd(opts.cwd);
405
406 let on_output_match = crate::notify::build_output_match_config(
408 opts.output_pattern,
409 opts.output_match_type,
410 opts.output_stream,
411 opts.output_command,
412 opts.output_file,
413 None,
414 );
415
416 let notification =
417 if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
418 {
419 Some(crate::schema::NotificationConfig {
420 notify_command: opts.notify_command.clone(),
421 notify_file: opts.notify_file.clone(),
422 on_output_match,
423 })
424 } else {
425 None
426 };
427
428 let tags = dedup_tags(opts.tags)?;
430
431 let meta = JobMeta {
432 job: JobMetaJob { id: job_id.clone() },
433 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
434 command: opts.command.clone(),
435 created_at: created_at.clone(),
436 root: root.display().to_string(),
437 env_keys,
438 env_vars: masked_env_vars.clone(),
439 env_vars_runtime: vec![],
442 mask: opts.mask.clone(),
443 cwd: Some(effective_cwd),
444 notification,
445 inherit_env: opts.inherit_env,
447 env_files: opts.env_files.clone(),
448 timeout_ms: opts.timeout_ms,
449 kill_after_ms: opts.kill_after_ms,
450 progress_every_ms: opts.progress_every_ms,
451 shell_wrapper: Some(opts.shell_wrapper.clone()),
452 tags: tags.clone(),
453 };
454
455 let job_dir = JobDir::create(&root, &job_id, &meta)?;
456 info!(job_id = %job_id, "created job directory");
457
458 let full_log_path = if let Some(log) = opts.log {
460 log.to_string()
461 } else {
462 job_dir.full_log_path().display().to_string()
463 };
464
465 pre_create_log_files(&job_dir)?;
467
468 let (_supervisor_pid, _started_at) = spawn_supervisor_process(
472 &job_dir,
473 SpawnSupervisorParams {
474 job_id: job_id.clone(),
475 root: root.clone(),
476 full_log_path: full_log_path.clone(),
477 timeout_ms: opts.timeout_ms,
478 kill_after_ms: opts.kill_after_ms,
479 cwd: opts.cwd.map(|s| s.to_string()),
480 env_vars: opts.env_vars.clone(),
481 env_files: opts.env_files.clone(),
482 inherit_env: opts.inherit_env,
483 progress_every_ms: opts.progress_every_ms,
484 notify_command: opts.notify_command.clone(),
485 notify_file: opts.notify_file.clone(),
486 shell_wrapper: opts.shell_wrapper.clone(),
487 command: opts.command.clone(),
488 },
489 )?;
490
491 let stdout_log_path = job_dir.stdout_path().display().to_string();
493 let stderr_log_path = job_dir.stderr_path().display().to_string();
494
495 let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
496 run_snapshot_wait(
497 &job_dir,
498 &SnapshotWaitOpts {
499 snapshot_after: opts.snapshot_after,
500 tail_lines: opts.tail_lines,
501 max_bytes: opts.max_bytes,
502 wait: opts.wait,
503 wait_poll_ms: opts.wait_poll_ms,
504 wait_until_ms: opts.wait_until_ms,
505 wait_forever: opts.wait_forever,
506 },
507 );
508
509 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
510
511 let response = Response::new(
512 "run",
513 RunData {
514 job_id,
515 state: final_state,
516 tags,
517 env_vars: masked_env_vars,
520 snapshot,
521 stdout_log_path,
522 stderr_log_path,
523 waited_ms,
524 elapsed_ms,
525 exit_code: exit_code_opt,
526 finished_at: finished_at_opt,
527 final_snapshot: final_snapshot_opt,
528 },
529 );
530 response.print();
531 Ok(())
532}
533
534fn build_snapshot(job_dir: &JobDir, tail_lines: u64, max_bytes: u64) -> Snapshot {
535 let stdout = job_dir.read_tail_metrics("stdout.log", tail_lines, max_bytes);
536 let stderr = job_dir.read_tail_metrics("stderr.log", tail_lines, max_bytes);
537 Snapshot {
538 truncated: stdout.truncated || stderr.truncated,
539 encoding: "utf-8-lossy".to_string(),
540 stdout_observed_bytes: stdout.observed_bytes,
541 stderr_observed_bytes: stderr.observed_bytes,
542 stdout_included_bytes: stdout.included_bytes,
543 stderr_included_bytes: stderr.included_bytes,
544 stdout_tail: stdout.tail,
545 stderr_tail: stderr.tail,
546 }
547}
548
549#[derive(Debug)]
555pub struct SuperviseOpts<'a> {
556 pub job_id: &'a str,
557 pub root: &'a Path,
558 pub command: &'a [String],
559 pub full_log: Option<&'a str>,
561 pub timeout_ms: u64,
563 pub kill_after_ms: u64,
565 pub cwd: Option<&'a str>,
567 pub env_vars: Vec<String>,
569 pub env_files: Vec<String>,
571 pub inherit_env: bool,
573 pub progress_every_ms: u64,
575 pub notify_command: Option<String>,
578 pub notify_file: Option<String>,
580 pub shell_wrapper: Vec<String>,
582}
583
584pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
591 let base = match cwd_override {
592 Some(p) => std::path::PathBuf::from(p),
593 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
594 };
595
596 match base.canonicalize() {
598 Ok(canonical) => canonical.display().to_string(),
599 Err(_) => {
600 if base.is_absolute() {
602 base.display().to_string()
603 } else {
604 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
606 cwd.join(base).display().to_string()
607 }
608 }
609 }
610}
611
612pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
615 if mask_keys.is_empty() {
616 return env_vars.to_vec();
617 }
618 env_vars
619 .iter()
620 .map(|s| {
621 let (key, _val) = parse_env_var(s);
622 if mask_keys.iter().any(|k| k == &key) {
623 format!("{key}=***")
624 } else {
625 s.clone()
626 }
627 })
628 .collect()
629}
630
631fn parse_env_var(s: &str) -> (String, String) {
633 if let Some(pos) = s.find('=') {
634 (s[..pos].to_string(), s[pos + 1..].to_string())
635 } else {
636 (s.to_string(), String::new())
637 }
638}
639
640fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
643 let contents =
644 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
645 let mut vars = Vec::new();
646 for line in contents.lines() {
647 let line = line.trim();
648 if line.is_empty() || line.starts_with('#') {
649 continue;
650 }
651 vars.push(parse_env_var(line));
652 }
653 Ok(vars)
654}
655
656struct OutputMatchChecker {
663 job_dir_path: std::path::PathBuf,
664 shell_wrapper: Vec<String>,
665 inner: std::sync::Mutex<OutputMatchInner>,
666}
667
668struct OutputMatchInner {
669 config: Option<crate::schema::NotificationConfig>,
670}
671
672impl OutputMatchChecker {
673 fn new(
674 job_dir_path: std::path::PathBuf,
675 shell_wrapper: Vec<String>,
676 initial_config: Option<crate::schema::NotificationConfig>,
677 ) -> Self {
678 Self {
679 job_dir_path,
680 shell_wrapper,
681 inner: std::sync::Mutex::new(OutputMatchInner {
682 config: initial_config,
683 }),
684 }
685 }
686
687 fn check_line(&self, line: &str, stream: &str) {
694 use crate::schema::{OutputMatchStream, OutputMatchType};
695
696 let match_info: Option<crate::schema::OutputMatchConfig> = {
698 let mut inner = self.inner.lock().unwrap();
699
700 {
702 let meta_path = self.job_dir_path.join("meta.json");
703 if let Ok(raw) = std::fs::read(&meta_path)
704 && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
705 {
706 inner.config = meta.notification;
707 }
708 }
709
710 let Some(ref notification) = inner.config else {
711 return;
712 };
713 let Some(ref match_cfg) = notification.on_output_match else {
714 return;
715 };
716
717 let stream_matches = match match_cfg.stream {
719 OutputMatchStream::Stdout => stream == "stdout",
720 OutputMatchStream::Stderr => stream == "stderr",
721 OutputMatchStream::Either => true,
722 };
723 if !stream_matches {
724 return;
725 }
726
727 let matched = match &match_cfg.match_type {
729 OutputMatchType::Contains => line.contains(&match_cfg.pattern),
730 OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
731 .map(|re| re.is_match(line))
732 .unwrap_or(false),
733 };
734
735 if matched {
736 Some(match_cfg.clone())
737 } else {
738 None
739 }
740 }; if let Some(match_cfg) = match_info {
743 self.dispatch_match(line, stream, &match_cfg);
744 }
745 }
746
747 fn dispatch_match(
750 &self,
751 line: &str,
752 stream: &str,
753 match_cfg: &crate::schema::OutputMatchConfig,
754 ) {
755 use std::io::Write;
756
757 let job_id = self
758 .job_dir_path
759 .file_name()
760 .and_then(|n| n.to_str())
761 .unwrap_or("unknown");
762
763 let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
764 let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
765 let events_path = self.job_dir_path.join("notification_events.ndjson");
766 let events_path_str = events_path.display().to_string();
767
768 let match_type_str = match &match_cfg.match_type {
769 crate::schema::OutputMatchType::Contains => "contains",
770 crate::schema::OutputMatchType::Regex => "regex",
771 };
772
773 let event = crate::schema::OutputMatchEvent {
774 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
775 event_type: "job.output.matched".to_string(),
776 job_id: job_id.to_string(),
777 pattern: match_cfg.pattern.clone(),
778 match_type: match_type_str.to_string(),
779 stream: stream.to_string(),
780 line: line.to_string(),
781 stdout_log_path,
782 stderr_log_path,
783 };
784
785 let event_json = serde_json::to_string(&event).unwrap_or_default();
786 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
787
788 if let Some(ref cmd) = match_cfg.command {
789 delivery_results.push(dispatch_command_sink(
790 cmd,
791 &event_json,
792 job_id,
793 &events_path_str,
794 &self.shell_wrapper,
795 "job.output.matched",
796 ));
797 }
798 if let Some(ref file_path) = match_cfg.file {
799 delivery_results.push(dispatch_file_sink(file_path, &event_json));
800 }
801
802 let record = crate::schema::OutputMatchEventRecord {
804 event,
805 delivery_results,
806 };
807 if let Ok(record_json) = serde_json::to_string(&record)
808 && let Ok(mut f) = std::fs::OpenOptions::new()
809 .create(true)
810 .append(true)
811 .open(&events_path)
812 {
813 let _ = writeln!(f, "{record_json}");
814 }
815 }
816}
817
818fn stream_to_logs<R, F>(
835 stream: R,
836 log_path: &std::path::Path,
837 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
838 label: &str,
839 on_line: Option<F>,
840) where
841 R: std::io::Read,
842 F: Fn(&str),
843{
844 use std::io::Write;
845 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
846 let mut stream = stream;
847 let mut buf = [0u8; 8192];
848 let mut line_buf: Vec<u8> = Vec::new();
850 loop {
851 match stream.read(&mut buf) {
852 Ok(0) => break, Ok(n) => {
854 let chunk = &buf[..n];
855 let _ = log_file.write_all(chunk);
857 for &b in chunk {
859 if b == b'\n' {
860 let line = String::from_utf8_lossy(&line_buf);
861 if let Ok(mut fl) = full_log.lock() {
862 let ts = now_rfc3339();
863 let _ = writeln!(fl, "{ts} [{label}] {line}");
864 }
865 if let Some(ref f) = on_line {
866 f(&line);
867 }
868 line_buf.clear();
869 } else {
870 line_buf.push(b);
871 }
872 }
873 }
874 Err(_) => break,
875 }
876 }
877 if !line_buf.is_empty() {
879 let line = String::from_utf8_lossy(&line_buf);
880 if let Ok(mut fl) = full_log.lock() {
881 let ts = now_rfc3339();
882 let _ = writeln!(fl, "{ts} [{label}] {line}");
883 }
884 if let Some(ref f) = on_line {
885 f(&line);
886 }
887 }
888}
889
890pub fn supervise(opts: SuperviseOpts) -> Result<()> {
900 use std::sync::{Arc, Mutex};
901
902 let job_id = opts.job_id;
903 let root = opts.root;
904 let command = opts.command;
905
906 if command.is_empty() {
907 anyhow::bail!("supervisor: no command");
908 }
909
910 let job_dir = JobDir::open(root, job_id)?;
911
912 let meta = job_dir.read_meta()?;
914 let started_at = now_rfc3339();
918
919 let full_log_path = if let Some(p) = opts.full_log {
921 std::path::PathBuf::from(p)
922 } else {
923 job_dir.full_log_path()
924 };
925
926 if let Some(parent) = full_log_path.parent() {
929 std::fs::create_dir_all(parent)
930 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
931 }
932 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
933 let full_log = Arc::new(Mutex::new(full_log_file));
934
935 if opts.shell_wrapper.is_empty() {
951 anyhow::bail!("supervisor: shell wrapper must not be empty");
952 }
953 let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
954 if command.len() == 1 {
955 child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
957 } else {
958 #[cfg(unix)]
968 {
969 child_cmd
972 .args(&opts.shell_wrapper[1..])
973 .arg("exec \"$@\"")
974 .arg("--")
975 .args(command);
976 }
977 #[cfg(not(unix))]
978 {
979 let joined = command
983 .iter()
984 .map(|a| {
985 if a.contains(' ') {
986 format!("\"{}\"", a)
987 } else {
988 a.clone()
989 }
990 })
991 .collect::<Vec<_>>()
992 .join(" ");
993 child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
994 }
995 }
996
997 if opts.inherit_env {
998 } else {
1000 child_cmd.env_clear();
1001 }
1002
1003 for env_file in &opts.env_files {
1005 let vars = load_env_file(env_file)?;
1006 for (k, v) in vars {
1007 child_cmd.env(&k, &v);
1008 }
1009 }
1010
1011 for env_var in &opts.env_vars {
1013 let (k, v) = parse_env_var(env_var);
1014 child_cmd.env(&k, &v);
1015 }
1016
1017 if let Some(cwd) = opts.cwd {
1019 child_cmd.current_dir(cwd);
1020 }
1021
1022 #[cfg(unix)]
1027 {
1028 use std::os::unix::process::CommandExt;
1029 unsafe {
1031 child_cmd.pre_exec(|| {
1032 libc::setsid();
1033 Ok(())
1034 });
1035 }
1036 }
1037
1038 let mut child = child_cmd
1040 .stdin(std::process::Stdio::null())
1041 .stdout(std::process::Stdio::piped())
1042 .stderr(std::process::Stdio::piped())
1043 .spawn()
1044 .context("supervisor: spawn child")?;
1045
1046 let pid = child.id();
1047 info!(job_id, pid, "child process started");
1048
1049 #[cfg(windows)]
1056 let windows_job_name = {
1057 match assign_to_job_object(job_id, pid) {
1058 Ok(name) => Some(name),
1059 Err(e) => {
1060 let kill_err = child.kill();
1064 let _ = child.wait(); let failed_state = JobState {
1067 job: JobStateJob {
1068 id: job_id.to_string(),
1069 status: JobStatus::Failed,
1070 started_at: Some(started_at.clone()),
1071 },
1072 result: JobStateResult {
1073 exit_code: None,
1074 signal: None,
1075 duration_ms: None,
1076 },
1077 pid: Some(pid),
1078 finished_at: Some(now_rfc3339()),
1079 updated_at: now_rfc3339(),
1080 windows_job_name: None,
1081 };
1082 let _ = job_dir.write_state(&failed_state);
1085
1086 if opts.notify_command.is_some() || opts.notify_file.is_some() {
1090 let finished_at_ts =
1091 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1092 let stdout_log = job_dir.stdout_path().display().to_string();
1093 let stderr_log = job_dir.stderr_path().display().to_string();
1094 let fail_event = crate::schema::CompletionEvent {
1095 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1096 event_type: "job.finished".to_string(),
1097 job_id: job_id.to_string(),
1098 state: JobStatus::Failed.as_str().to_string(),
1099 command: meta.command.clone(),
1100 cwd: meta.cwd.clone(),
1101 started_at: started_at.clone(),
1102 finished_at: finished_at_ts,
1103 duration_ms: None,
1104 exit_code: None,
1105 signal: None,
1106 stdout_log_path: stdout_log,
1107 stderr_log_path: stderr_log,
1108 };
1109 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1110 let fail_event_path = job_dir.completion_event_path().display().to_string();
1111 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1112 Vec::new();
1113 if let Err(we) = job_dir.write_completion_event_atomic(
1114 &crate::schema::CompletionEventRecord {
1115 event: fail_event.clone(),
1116 delivery_results: vec![],
1117 },
1118 ) {
1119 warn!(
1120 job_id,
1121 error = %we,
1122 "failed to write initial completion_event.json for failed job"
1123 );
1124 }
1125 if let Some(ref shell_cmd) = opts.notify_command {
1126 fail_delivery_results.push(dispatch_command_sink(
1127 shell_cmd,
1128 &fail_event_json,
1129 job_id,
1130 &fail_event_path,
1131 &opts.shell_wrapper,
1132 "job.finished",
1133 ));
1134 }
1135 if let Some(ref file_path) = opts.notify_file {
1136 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1137 }
1138 if let Err(we) = job_dir.write_completion_event_atomic(
1139 &crate::schema::CompletionEventRecord {
1140 event: fail_event,
1141 delivery_results: fail_delivery_results,
1142 },
1143 ) {
1144 warn!(
1145 job_id,
1146 error = %we,
1147 "failed to update completion_event.json with delivery results for failed job"
1148 );
1149 }
1150 }
1151
1152 if let Err(ke) = kill_err {
1153 return Err(anyhow::anyhow!(
1154 "supervisor: failed to assign pid {pid} to Job Object \
1155 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1156 ));
1157 }
1158 return Err(anyhow::anyhow!(
1159 "supervisor: failed to assign pid {pid} to Job Object \
1160 (Windows MUST requirement); child process was killed; \
1161 consider running outside a nested Job Object environment: {e}"
1162 ));
1163 }
1164 }
1165 };
1166 #[cfg(not(windows))]
1167 let windows_job_name: Option<String> = None;
1168
1169 let state = JobState {
1174 job: JobStateJob {
1175 id: job_id.to_string(),
1176 status: JobStatus::Running,
1177 started_at: Some(started_at.clone()),
1178 },
1179 result: JobStateResult {
1180 exit_code: None,
1181 signal: None,
1182 duration_ms: None,
1183 },
1184 pid: Some(pid),
1185 finished_at: None,
1186 updated_at: now_rfc3339(),
1187 windows_job_name,
1188 };
1189 job_dir.write_state(&state)?;
1190
1191 let child_start_time = std::time::Instant::now();
1192
1193 let child_stdout = child.stdout.take().expect("child stdout piped");
1195 let child_stderr = child.stderr.take().expect("child stderr piped");
1196
1197 let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1199 job_dir.path.clone(),
1200 opts.shell_wrapper.clone(),
1201 meta.notification.clone(),
1202 ));
1203
1204 let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1208 let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1209
1210 let stdout_log_path = job_dir.stdout_path();
1212 let full_log_stdout = Arc::clone(&full_log);
1213 let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1214 let t_stdout = std::thread::spawn(move || {
1215 stream_to_logs(
1216 child_stdout,
1217 &stdout_log_path,
1218 full_log_stdout,
1219 "STDOUT",
1220 Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1221 );
1222 let _ = tx_stdout_done.send(());
1223 });
1224
1225 let stderr_log_path = job_dir.stderr_path();
1227 let full_log_stderr = Arc::clone(&full_log);
1228 let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1229 let t_stderr = std::thread::spawn(move || {
1230 stream_to_logs(
1231 child_stderr,
1232 &stderr_log_path,
1233 full_log_stderr,
1234 "STDERR",
1235 Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1236 );
1237 let _ = tx_stderr_done.send(());
1238 });
1239
1240 let timeout_ms = opts.timeout_ms;
1243 let kill_after_ms = opts.kill_after_ms;
1244 let progress_every_ms = opts.progress_every_ms;
1245 let state_path = job_dir.state_path();
1246 let job_id_str = job_id.to_string();
1247
1248 use std::sync::atomic::{AtomicBool, Ordering};
1250 let child_done = Arc::new(AtomicBool::new(false));
1251
1252 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1253 let state_path_clone = state_path.clone();
1254 let child_done_clone = Arc::clone(&child_done);
1255 Some(std::thread::spawn(move || {
1256 let start = std::time::Instant::now();
1257 let timeout_dur = if timeout_ms > 0 {
1258 Some(std::time::Duration::from_millis(timeout_ms))
1259 } else {
1260 None
1261 };
1262 let progress_dur = if progress_every_ms > 0 {
1263 Some(std::time::Duration::from_millis(progress_every_ms))
1264 } else {
1265 None
1266 };
1267
1268 let poll_interval = std::time::Duration::from_millis(100);
1269
1270 loop {
1271 std::thread::sleep(poll_interval);
1272
1273 if child_done_clone.load(Ordering::Relaxed) {
1275 break;
1276 }
1277
1278 let elapsed = start.elapsed();
1279
1280 if let Some(td) = timeout_dur
1282 && elapsed >= td
1283 {
1284 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1285 #[cfg(unix)]
1288 {
1289 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1290 }
1291 if kill_after_ms > 0 {
1293 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1294 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1295 #[cfg(unix)]
1296 {
1297 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1298 }
1299 } else {
1300 #[cfg(unix)]
1302 {
1303 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1304 }
1305 }
1306 break;
1307 }
1308
1309 if let Some(pd) = progress_dur {
1311 let elapsed_ms = elapsed.as_millis() as u64;
1312 let pd_ms = pd.as_millis() as u64;
1313 let poll_ms = poll_interval.as_millis() as u64;
1314 if elapsed_ms % pd_ms < poll_ms {
1315 if let Ok(raw) = std::fs::read(&state_path_clone)
1317 && let Ok(mut st) =
1318 serde_json::from_slice::<crate::schema::JobState>(&raw)
1319 {
1320 st.updated_at = now_rfc3339();
1321 if let Ok(s) = serde_json::to_string_pretty(&st) {
1322 let _ = std::fs::write(&state_path_clone, s);
1323 }
1324 }
1325 }
1326 }
1327 }
1328 }))
1329 } else {
1330 None
1331 };
1332
1333 let exit_status = child.wait().context("wait for child")?;
1335
1336 child_done.store(true, Ordering::Relaxed);
1338
1339 let duration_ms = child_start_time.elapsed().as_millis() as u64;
1346 let exit_code = exit_status.code();
1347 let finished_at = now_rfc3339();
1348
1349 #[cfg(unix)]
1351 let (terminal_status, signal_name) = {
1352 use std::os::unix::process::ExitStatusExt;
1353 if let Some(sig) = exit_status.signal() {
1354 (JobStatus::Killed, Some(sig.to_string()))
1355 } else {
1356 (JobStatus::Exited, None)
1357 }
1358 };
1359 #[cfg(not(unix))]
1360 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1361
1362 let state = JobState {
1363 job: JobStateJob {
1364 id: job_id.to_string(),
1365 status: terminal_status.clone(),
1366 started_at: Some(started_at.clone()),
1367 },
1368 result: JobStateResult {
1369 exit_code,
1370 signal: signal_name.clone(),
1371 duration_ms: Some(duration_ms),
1372 },
1373 pid: Some(pid),
1374 finished_at: Some(finished_at.clone()),
1375 updated_at: now_rfc3339(),
1376 windows_job_name: None, };
1378 job_dir.write_state(&state)?;
1379 info!(job_id, ?exit_code, "child process finished");
1380
1381 const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1392 let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1393
1394 let remaining = drain_deadline
1395 .checked_duration_since(std::time::Instant::now())
1396 .unwrap_or(std::time::Duration::ZERO);
1397 if rx_stdout_done.recv_timeout(remaining).is_ok() {
1398 let _ = t_stdout.join();
1399 } else {
1400 drop(t_stdout); }
1402
1403 let remaining = drain_deadline
1404 .checked_duration_since(std::time::Instant::now())
1405 .unwrap_or(std::time::Duration::ZERO);
1406 if rx_stderr_done.recv_timeout(remaining).is_ok() {
1407 let _ = t_stderr.join();
1408 } else {
1409 drop(t_stderr); }
1411
1412 if let Some(w) = watcher {
1414 let _ = w.join();
1415 }
1416
1417 let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1420 let (current_notify_command, current_notify_file) = match &latest_notification {
1421 Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1422 None => (None, None),
1423 };
1424
1425 let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1428 if has_notification {
1429 let stdout_log = job_dir.stdout_path().display().to_string();
1430 let stderr_log = job_dir.stderr_path().display().to_string();
1431 let event = crate::schema::CompletionEvent {
1432 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1433 event_type: "job.finished".to_string(),
1434 job_id: job_id.to_string(),
1435 state: terminal_status.as_str().to_string(),
1436 command: meta.command.clone(),
1437 cwd: meta.cwd.clone(),
1438 started_at,
1439 finished_at,
1440 duration_ms: Some(duration_ms),
1441 exit_code,
1442 signal: signal_name,
1443 stdout_log_path: stdout_log,
1444 stderr_log_path: stderr_log,
1445 };
1446
1447 let event_json = serde_json::to_string(&event).unwrap_or_default();
1448 let event_path = job_dir.completion_event_path().display().to_string();
1449 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1450
1451 if let Err(e) =
1453 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1454 event: event.clone(),
1455 delivery_results: vec![],
1456 })
1457 {
1458 warn!(job_id, error = %e, "failed to write initial completion_event.json");
1459 }
1460
1461 if let Some(ref shell_cmd) = current_notify_command {
1462 delivery_results.push(dispatch_command_sink(
1463 shell_cmd,
1464 &event_json,
1465 job_id,
1466 &event_path,
1467 &opts.shell_wrapper,
1468 "job.finished",
1469 ));
1470 }
1471 if let Some(ref file_path) = current_notify_file {
1472 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1473 }
1474
1475 if let Err(e) =
1477 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1478 event,
1479 delivery_results,
1480 })
1481 {
1482 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1483 }
1484 }
1485
1486 Ok(())
1487}
1488
1489fn dispatch_command_sink(
1496 shell_cmd: &str,
1497 event_json: &str,
1498 job_id: &str,
1499 event_path: &str,
1500 shell_wrapper: &[String],
1501 event_type: &str,
1502) -> crate::schema::SinkDeliveryResult {
1503 use std::io::Write;
1504 let attempted_at = now_rfc3339();
1505 let target = shell_cmd.to_string();
1506
1507 if shell_cmd.trim().is_empty() {
1508 return crate::schema::SinkDeliveryResult {
1509 sink_type: "command".to_string(),
1510 target,
1511 success: false,
1512 error: Some("empty shell command".to_string()),
1513 attempted_at,
1514 };
1515 }
1516
1517 if shell_wrapper.is_empty() {
1518 return crate::schema::SinkDeliveryResult {
1519 sink_type: "command".to_string(),
1520 target,
1521 success: false,
1522 error: Some("shell wrapper must not be empty".to_string()),
1523 attempted_at,
1524 };
1525 }
1526
1527 let mut cmd = Command::new(&shell_wrapper[0]);
1528 cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1529
1530 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1531 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1532 cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1533 cmd.stdin(std::process::Stdio::piped());
1534 cmd.stdout(std::process::Stdio::null());
1535 cmd.stderr(std::process::Stdio::null());
1536
1537 match cmd.spawn() {
1538 Ok(mut child) => {
1539 if let Some(mut stdin) = child.stdin.take() {
1540 let _ = stdin.write_all(event_json.as_bytes());
1541 }
1542 match child.wait() {
1543 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1544 sink_type: "command".to_string(),
1545 target,
1546 success: true,
1547 error: None,
1548 attempted_at,
1549 },
1550 Ok(status) => crate::schema::SinkDeliveryResult {
1551 sink_type: "command".to_string(),
1552 target,
1553 success: false,
1554 error: Some(format!("exited with status {status}")),
1555 attempted_at,
1556 },
1557 Err(e) => crate::schema::SinkDeliveryResult {
1558 sink_type: "command".to_string(),
1559 target,
1560 success: false,
1561 error: Some(format!("wait error: {e}")),
1562 attempted_at,
1563 },
1564 }
1565 }
1566 Err(e) => crate::schema::SinkDeliveryResult {
1567 sink_type: "command".to_string(),
1568 target,
1569 success: false,
1570 error: Some(format!("spawn error: {e}")),
1571 attempted_at,
1572 },
1573 }
1574}
1575
1576fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1579 use std::io::Write;
1580 let attempted_at = now_rfc3339();
1581 let path = std::path::Path::new(file_path);
1582
1583 if let Some(parent) = path.parent()
1584 && let Err(e) = std::fs::create_dir_all(parent)
1585 {
1586 return crate::schema::SinkDeliveryResult {
1587 sink_type: "file".to_string(),
1588 target: file_path.to_string(),
1589 success: false,
1590 error: Some(format!("create parent dir: {e}")),
1591 attempted_at,
1592 };
1593 }
1594
1595 match std::fs::OpenOptions::new()
1596 .create(true)
1597 .append(true)
1598 .open(path)
1599 {
1600 Ok(mut f) => match writeln!(f, "{event_json}") {
1601 Ok(_) => crate::schema::SinkDeliveryResult {
1602 sink_type: "file".to_string(),
1603 target: file_path.to_string(),
1604 success: true,
1605 error: None,
1606 attempted_at,
1607 },
1608 Err(e) => crate::schema::SinkDeliveryResult {
1609 sink_type: "file".to_string(),
1610 target: file_path.to_string(),
1611 success: false,
1612 error: Some(format!("write error: {e}")),
1613 attempted_at,
1614 },
1615 },
1616 Err(e) => crate::schema::SinkDeliveryResult {
1617 sink_type: "file".to_string(),
1618 target: file_path.to_string(),
1619 success: false,
1620 error: Some(format!("open error: {e}")),
1621 attempted_at,
1622 },
1623 }
1624}
1625
1626pub fn now_rfc3339_pub() -> String {
1628 now_rfc3339()
1629}
1630
1631fn now_rfc3339() -> String {
1632 let d = std::time::SystemTime::now()
1634 .duration_since(std::time::UNIX_EPOCH)
1635 .unwrap_or_default();
1636 format_rfc3339(d.as_secs())
1637}
1638
1639fn format_rfc3339(secs: u64) -> String {
1640 let mut s = secs;
1642 let seconds = s % 60;
1643 s /= 60;
1644 let minutes = s % 60;
1645 s /= 60;
1646 let hours = s % 24;
1647 s /= 24;
1648
1649 let mut days = s;
1651 let mut year = 1970u64;
1652 loop {
1653 let days_in_year = if is_leap(year) { 366 } else { 365 };
1654 if days < days_in_year {
1655 break;
1656 }
1657 days -= days_in_year;
1658 year += 1;
1659 }
1660
1661 let leap = is_leap(year);
1662 let month_days: [u64; 12] = [
1663 31,
1664 if leap { 29 } else { 28 },
1665 31,
1666 30,
1667 31,
1668 30,
1669 31,
1670 31,
1671 30,
1672 31,
1673 30,
1674 31,
1675 ];
1676 let mut month = 0usize;
1677 for (i, &d) in month_days.iter().enumerate() {
1678 if days < d {
1679 month = i;
1680 break;
1681 }
1682 days -= d;
1683 }
1684 let day = days + 1;
1685
1686 format!(
1687 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1688 year,
1689 month + 1,
1690 day,
1691 hours,
1692 minutes,
1693 seconds
1694 )
1695}
1696
1697fn is_leap(year: u64) -> bool {
1698 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1699}
1700
1701#[cfg(windows)]
1712fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1713 use windows::Win32::Foundation::CloseHandle;
1714 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1715 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1716 use windows::core::HSTRING;
1717
1718 let job_name = format!("AgentExec-{job_id}");
1719 let hname = HSTRING::from(job_name.as_str());
1720
1721 unsafe {
1722 let proc_handle =
1724 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1725 anyhow::anyhow!(
1726 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1727 )
1728 })?;
1729
1730 let job = match CreateJobObjectW(None, &hname) {
1732 Ok(h) => h,
1733 Err(e) => {
1734 let _ = CloseHandle(proc_handle);
1735 return Err(anyhow::anyhow!(
1736 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1737 ));
1738 }
1739 };
1740
1741 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1745 let _ = CloseHandle(job);
1746 let _ = CloseHandle(proc_handle);
1747 return Err(anyhow::anyhow!(
1748 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1749 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1750 ));
1751 }
1752
1753 let _ = CloseHandle(proc_handle);
1758 std::mem::forget(job);
1761 }
1762
1763 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1764 Ok(job_name)
1765}
1766
1767#[cfg(test)]
1768mod tests {
1769 use super::*;
1770
1771 #[test]
1772 fn rfc3339_epoch() {
1773 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1774 }
1775
1776 #[test]
1777 fn rfc3339_known_date() {
1778 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1780 }
1781}