1use anyhow::{Context, Result};
10use std::io::IsTerminal;
11use std::path::Path;
12use std::process::Command;
13use tracing::{debug, info, warn};
14
15use crate::jobstore::{JobDir, generate_job_id, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18};
19
20#[derive(Debug, Clone)]
21pub struct InlineObservation {
22 pub waited_ms: u64,
23 pub stdout: String,
24 pub stderr: String,
25 pub stdout_range: [u64; 2],
26 pub stderr_range: [u64; 2],
27 pub stdout_total_bytes: u64,
28 pub stderr_total_bytes: u64,
29 pub encoding: String,
30 pub state: String,
31 pub exit_code: Option<i32>,
32 pub finished_at: Option<String>,
33 pub signal: Option<String>,
34 pub duration_ms: Option<u64>,
35}
36use crate::tag::dedup_tags;
37
38#[derive(Debug)]
47pub struct RunOpts<'a> {
48 pub command: Vec<String>,
50 pub root: Option<&'a str>,
52 pub no_auto_gc: bool,
54 pub auto_gc_older_than: Option<String>,
56 pub auto_gc_max_jobs: Option<u64>,
58 pub auto_gc_max_bytes: Option<u64>,
60 pub auto_gc_config: crate::gc::AutoGcConfig,
62 pub wait: bool,
64 pub until_seconds: u64,
66 pub forever: bool,
68 pub max_bytes: u64,
70 pub compression_mode: crate::compress::CompressionMode,
71 pub timeout_ms: u64,
73 pub kill_after_ms: u64,
75 pub cwd: Option<&'a str>,
77 pub env_vars: Vec<String>,
79 pub env_files: Vec<String>,
81 pub inherit_env: bool,
83 pub mask: Vec<String>,
85 pub stdin: Option<StdinSource>,
87 pub stdin_max_bytes: u64,
89 pub tags: Vec<String>,
91 pub log: Option<&'a str>,
93 pub progress_every_ms: u64,
95 pub notify_command: Option<String>,
98 pub notify_file: Option<String>,
100 pub output_pattern: Option<String>,
102 pub output_match_type: Option<String>,
104 pub output_stream: Option<String>,
106 pub output_command: Option<String>,
108 pub output_file: Option<String>,
110 pub shell_wrapper: Vec<String>,
113}
114
115impl<'a> Default for RunOpts<'a> {
116 fn default() -> Self {
117 RunOpts {
118 command: vec![],
119 root: None,
120 no_auto_gc: false,
121 auto_gc_older_than: None,
122 auto_gc_max_jobs: None,
123 auto_gc_max_bytes: None,
124 auto_gc_config: crate::gc::AutoGcConfig::default(),
125 wait: true,
126 until_seconds: 10,
127 forever: false,
128 max_bytes: 65536,
129 compression_mode: crate::compress::CompressionMode::default(),
130 timeout_ms: 0,
131 kill_after_ms: 0,
132 cwd: None,
133 env_vars: vec![],
134 env_files: vec![],
135 inherit_env: true,
136 mask: vec![],
137 stdin: None,
138 stdin_max_bytes: DEFAULT_STDIN_MAX_BYTES,
139 tags: vec![],
140 log: None,
141 progress_every_ms: 0,
142 notify_command: None,
143 notify_file: None,
144 output_pattern: None,
145 output_match_type: None,
146 output_stream: None,
147 output_command: None,
148 output_file: None,
149 shell_wrapper: crate::config::default_shell_wrapper(),
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
158pub enum StdinSource {
159 CallerStdin,
160 Inline(String),
161 File(String),
162}
163
164pub struct SpawnSupervisorParams {
165 pub job_id: String,
166 pub root: std::path::PathBuf,
167 pub full_log_path: String,
168 pub timeout_ms: u64,
169 pub kill_after_ms: u64,
170 pub cwd: Option<String>,
171 pub env_vars: Vec<String>,
173 pub env_files: Vec<String>,
174 pub inherit_env: bool,
175 pub stdin_file: Option<String>,
176 pub progress_every_ms: u64,
177 pub notify_command: Option<String>,
178 pub notify_file: Option<String>,
179 pub shell_wrapper: Vec<String>,
180 pub command: Vec<String>,
181}
182
183pub fn resolve_stdin_source(
184 stdin: Option<String>,
185 stdin_file: Option<String>,
186) -> Option<StdinSource> {
187 if let Some(value) = stdin {
188 if value == "-" {
189 Some(StdinSource::CallerStdin)
190 } else {
191 Some(StdinSource::Inline(value))
192 }
193 } else {
194 stdin_file.map(StdinSource::File)
195 }
196}
197
198pub fn validate_stdin_source(stdin: Option<&StdinSource>) -> Result<()> {
199 if matches!(stdin, Some(StdinSource::CallerStdin)) {
200 let stdin = std::io::stdin();
201 if stdin.is_terminal() {
202 return Err(anyhow::anyhow!(StdinRequired(
203 "stdin_required: --stdin - requires non-interactive stdin (pipe/heredoc/redirect)"
204 .to_string(),
205 )));
206 }
207 }
208 Ok(())
209}
210
211fn create_stdin_file(path: &std::path::Path) -> Result<std::fs::File> {
212 #[cfg(unix)]
213 {
214 use std::os::unix::fs::OpenOptionsExt;
215 std::fs::OpenOptions::new()
216 .write(true)
217 .create(true)
218 .truncate(true)
219 .mode(0o600)
220 .open(path)
221 .with_context(|| format!("create materialized stdin {}", path.display()))
222 }
223 #[cfg(not(unix))]
224 {
225 std::fs::File::create(path)
226 .with_context(|| format!("create materialized stdin {}", path.display()))
227 }
228}
229
230struct LimitedWriter<W> {
231 inner: W,
232 written: u64,
233 limit: u64,
234}
235
236impl<W: std::io::Write> std::io::Write for LimitedWriter<W> {
237 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
238 if self.written + buf.len() as u64 > self.limit {
239 return Err(std::io::Error::other("stdin_too_large"));
240 }
241 let n = self.inner.write(buf)?;
242 self.written += n as u64;
243 Ok(n)
244 }
245
246 fn flush(&mut self) -> std::io::Result<()> {
247 self.inner.flush()
248 }
249}
250
251fn materialize_stdin(
252 job_dir: &JobDir,
253 stdin: Option<&StdinSource>,
254 max_bytes: u64,
255) -> Result<Option<String>> {
256 let Some(source) = stdin else {
257 return Ok(None);
258 };
259
260 let target_name = "stdin.bin".to_string();
261 let target_path = job_dir.path.join(&target_name);
262 let file = create_stdin_file(&target_path)?;
263 let mut target = LimitedWriter {
264 inner: file,
265 written: 0,
266 limit: max_bytes,
267 };
268
269 let copy_result = match source {
270 StdinSource::CallerStdin => {
271 let mut stdin = std::io::stdin();
272 std::io::copy(&mut stdin, &mut target).context("materialize caller stdin to stdin.bin")
273 }
274 StdinSource::Inline(value) => {
275 use std::io::Write;
276 target
277 .write_all(value.as_bytes())
278 .map(|_| 0u64)
279 .context("write inline stdin to stdin.bin")
280 }
281 StdinSource::File(path) => {
282 let mut input = std::fs::File::open(path)
283 .with_context(|| format!("open --stdin-file source {}", path))?;
284 std::io::copy(&mut input, &mut target)
285 .with_context(|| format!("copy --stdin-file source {} to stdin.bin", path))
286 }
287 };
288
289 if let Err(e) = copy_result {
290 let _ = std::fs::remove_file(&target_path);
291 let is_too_large = e
292 .chain()
293 .any(|cause| cause.to_string().contains("stdin_too_large"));
294 if is_too_large {
295 return Err(anyhow::anyhow!(StdinTooLarge(format!(
296 "stdin_too_large: input exceeds {} byte limit",
297 max_bytes
298 ))));
299 }
300 return Err(e);
301 }
302
303 Ok(Some(target_name))
304}
305
306fn resolve_stdin_path(job_dir: &JobDir, stdin_file: Option<&str>) -> Option<std::path::PathBuf> {
307 stdin_file.map(|p| {
308 let path = std::path::Path::new(p);
309 if path.is_absolute() {
310 path.to_path_buf()
311 } else {
312 job_dir.path.join(path)
313 }
314 })
315}
316
317#[derive(Debug)]
318pub struct StdinRequired(pub String);
319
320impl std::fmt::Display for StdinRequired {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 write!(f, "{}", self.0)
323 }
324}
325
326impl std::error::Error for StdinRequired {}
327
328#[derive(Debug)]
329pub struct StdinTooLarge(pub String);
330
331impl std::fmt::Display for StdinTooLarge {
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 write!(f, "{}", self.0)
334 }
335}
336
337impl std::error::Error for StdinTooLarge {}
338
339pub fn open_child_stdin(job_dir: &JobDir, stdin_file: Option<&str>) -> Result<std::process::Stdio> {
340 if let Some(path) = resolve_stdin_path(job_dir, stdin_file) {
341 let file = std::fs::File::open(&path)
342 .with_context(|| format!("open materialized stdin {}", path.display()))?;
343 Ok(std::process::Stdio::from(file))
344 } else {
345 Ok(std::process::Stdio::null())
346 }
347}
348
349pub const DEFAULT_STDIN_MAX_BYTES: u64 = 64 * 1024 * 1024; pub fn materialize_stdin_for_job(
352 job_dir: &JobDir,
353 stdin: Option<&StdinSource>,
354 max_bytes: u64,
355) -> Result<Option<String>> {
356 materialize_stdin(job_dir, stdin, max_bytes)
357}
358
359pub fn spawn_supervisor_process(
364 job_dir: &JobDir,
365 params: SpawnSupervisorParams,
366) -> Result<(u32, String)> {
367 let started_at = now_rfc3339();
368
369 let exe = std::env::current_exe().context("resolve current exe")?;
370 let mut supervisor_cmd = Command::new(&exe);
371 supervisor_cmd
372 .arg("_supervise")
373 .arg("--job-id")
374 .arg(¶ms.job_id)
375 .arg("--supervise-root")
376 .arg(params.root.display().to_string())
377 .arg("--full-log")
378 .arg(¶ms.full_log_path);
379
380 if params.timeout_ms > 0 {
381 let timeout_seconds = params.timeout_ms.saturating_add(999) / 1000;
382 supervisor_cmd
383 .arg("--timeout")
384 .arg(timeout_seconds.to_string());
385 }
386 if params.kill_after_ms > 0 {
387 let kill_after_seconds = params.kill_after_ms.saturating_add(999) / 1000;
388 supervisor_cmd
389 .arg("--kill-after")
390 .arg(kill_after_seconds.to_string());
391 }
392 if let Some(ref cwd) = params.cwd {
393 supervisor_cmd.arg("--cwd").arg(cwd);
394 }
395 for env_file in ¶ms.env_files {
396 supervisor_cmd.arg("--env-file").arg(env_file);
397 }
398 for env_var in ¶ms.env_vars {
399 supervisor_cmd.arg("--env").arg(env_var);
400 }
401 if !params.inherit_env {
402 supervisor_cmd.arg("--no-inherit-env");
403 }
404 if let Some(ref stdin_file) = params.stdin_file {
405 supervisor_cmd.arg("--stdin-file").arg(stdin_file);
406 }
407 if params.progress_every_ms > 0 {
408 let progress_every_seconds = params.progress_every_ms.saturating_add(999) / 1000;
409 supervisor_cmd
410 .arg("--progress-every")
411 .arg(progress_every_seconds.to_string());
412 }
413 if let Some(ref nc) = params.notify_command {
414 supervisor_cmd.arg("--notify-command").arg(nc);
415 }
416 if let Some(ref nf) = params.notify_file {
417 supervisor_cmd.arg("--notify-file").arg(nf);
418 }
419 let wrapper_json =
420 serde_json::to_string(¶ms.shell_wrapper).context("serialize shell wrapper")?;
421 supervisor_cmd
422 .arg("--shell-wrapper-resolved")
423 .arg(&wrapper_json);
424
425 supervisor_cmd
426 .arg("--")
427 .args(¶ms.command)
428 .stdin(std::process::Stdio::null())
429 .stdout(std::process::Stdio::null())
430 .stderr(std::process::Stdio::null());
431
432 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
433 let supervisor_pid = supervisor.id();
434 debug!(supervisor_pid, "supervisor spawned");
435
436 job_dir.init_state(supervisor_pid, &started_at)?;
438
439 #[cfg(windows)]
441 {
442 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
443 loop {
444 std::thread::sleep(std::time::Duration::from_millis(10));
445 if let Ok(current_state) = job_dir.read_state() {
446 let supervisor_updated = current_state
447 .pid
448 .map(|p| p != supervisor_pid)
449 .unwrap_or(false)
450 || *current_state.status() == crate::schema::JobStatus::Failed;
451 if supervisor_updated {
452 if *current_state.status() == crate::schema::JobStatus::Failed {
453 anyhow::bail!(
454 "supervisor failed to assign child process to Job Object \
455 (Windows MUST requirement); see stderr for details"
456 );
457 }
458 debug!("supervisor confirmed Job Object assignment via state.json handshake");
459 break;
460 }
461 }
462 if std::time::Instant::now() >= handshake_deadline {
463 debug!("supervisor handshake timed out; proceeding with initial state");
464 break;
465 }
466 }
467 }
468
469 Ok((supervisor_pid, started_at))
470}
471
472pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
475 for log_path in [
476 job_dir.stdout_path(),
477 job_dir.stderr_path(),
478 job_dir.full_log_path(),
479 ] {
480 std::fs::OpenOptions::new()
481 .create(true)
482 .append(true)
483 .open(&log_path)
484 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
485 }
486 Ok(())
487}
488
489pub fn execute(opts: RunOpts) -> Result<()> {
491 if opts.command.is_empty() {
492 anyhow::bail!("no command specified for run");
493 }
494
495 let elapsed_start = std::time::Instant::now();
496
497 let root = resolve_root(opts.root);
498 std::fs::create_dir_all(&root)
499 .with_context(|| format!("create jobs root {}", root.display()))?;
500
501 let job_id = generate_job_id(&root)?;
502 let created_at = now_rfc3339();
503
504 let env_keys: Vec<String> = opts
506 .env_vars
507 .iter()
508 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
509 .collect();
510
511 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
513
514 let effective_cwd = resolve_effective_cwd(opts.cwd);
518
519 let on_output_match = crate::notify::build_output_match_config(
521 opts.output_pattern,
522 opts.output_match_type,
523 opts.output_stream,
524 opts.output_command,
525 opts.output_file,
526 None,
527 );
528
529 let notification =
530 if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
531 {
532 Some(crate::schema::NotificationConfig {
533 notify_command: opts.notify_command.clone(),
534 notify_file: opts.notify_file.clone(),
535 on_output_match,
536 })
537 } else {
538 None
539 };
540
541 let tags = dedup_tags(opts.tags)?;
543
544 let meta = JobMeta {
545 job: JobMetaJob { id: job_id.clone() },
546 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
547 command: opts.command.clone(),
548 created_at: created_at.clone(),
549 root: root.display().to_string(),
550 env_keys,
551 env_vars: masked_env_vars.clone(),
552 env_vars_runtime: vec![],
555 mask: opts.mask.clone(),
556 cwd: Some(effective_cwd),
557 notification,
558 inherit_env: opts.inherit_env,
560 env_files: opts.env_files.clone(),
561 timeout_ms: opts.timeout_ms,
562 kill_after_ms: opts.kill_after_ms,
563 progress_every_ms: opts.progress_every_ms,
564 shell_wrapper: Some(opts.shell_wrapper.clone()),
565 stdin_file: None,
566 tags: tags.clone(),
567 };
568
569 validate_stdin_source(opts.stdin.as_ref())?;
570
571 let job_dir = JobDir::create(&root, &job_id, &meta)?;
572 let stdin_file =
573 materialize_stdin_for_job(&job_dir, opts.stdin.as_ref(), opts.stdin_max_bytes)?;
574 if stdin_file.is_some() {
575 let mut meta_with_stdin = meta.clone();
576 meta_with_stdin.stdin_file = stdin_file.clone();
577 job_dir.write_meta_atomic(&meta_with_stdin)?;
578 }
579 info!(job_id = %job_id, "created job directory");
580
581 let full_log_path = if let Some(log) = opts.log {
583 log.to_string()
584 } else {
585 job_dir.full_log_path().display().to_string()
586 };
587
588 pre_create_log_files(&job_dir)?;
590
591 let (_supervisor_pid, _started_at) = spawn_supervisor_process(
595 &job_dir,
596 SpawnSupervisorParams {
597 job_id: job_id.clone(),
598 root: root.clone(),
599 full_log_path: full_log_path.clone(),
600 timeout_ms: opts.timeout_ms,
601 kill_after_ms: opts.kill_after_ms,
602 cwd: opts.cwd.map(|s| s.to_string()),
603 env_vars: opts.env_vars.clone(),
604 env_files: opts.env_files.clone(),
605 inherit_env: opts.inherit_env,
606 stdin_file: stdin_file.clone(),
607 progress_every_ms: opts.progress_every_ms,
608 notify_command: opts.notify_command.clone(),
609 notify_file: opts.notify_file.clone(),
610 shell_wrapper: opts.shell_wrapper.clone(),
611 command: opts.command.clone(),
612 },
613 )?;
614
615 let stdout_log_path = job_dir.stdout_path().display().to_string();
617 let stderr_log_path = job_dir.stderr_path().display().to_string();
618
619 let observation = observe_inline_output(
620 &job_dir,
621 opts.wait,
622 opts.until_seconds,
623 opts.forever,
624 opts.max_bytes,
625 )?;
626 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
627 let compression = crate::compress::compress(crate::compress::CompressionInput {
628 command: &opts.command,
629 stdout: &observation.stdout,
630 stderr: &observation.stderr,
631 stdout_original_bytes: observation.stdout_total_bytes,
632 stderr_original_bytes: observation.stderr_total_bytes,
633 mode: opts.compression_mode,
634 });
635
636 if !opts.no_auto_gc {
637 let mut auto_cfg = opts.auto_gc_config.clone();
638 if let Some(v) = opts.auto_gc_older_than {
639 auto_cfg.older_than = v;
640 }
641 if let Some(v) = opts.auto_gc_max_jobs {
642 auto_cfg.max_jobs = usize::try_from(v).ok();
643 }
644 if let Some(v) = opts.auto_gc_max_bytes {
645 auto_cfg.max_bytes = Some(v);
646 }
647 crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
648 }
649
650 let response = Response::new(
651 "run",
652 RunData {
653 job_id,
654 state: observation.state,
655 tags,
656 env_vars: masked_env_vars,
659 stdout_log_path,
660 stderr_log_path,
661 elapsed_ms,
662 waited_ms: observation.waited_ms,
663 stdout: observation.stdout,
664 stderr: observation.stderr,
665 stdout_range: observation.stdout_range,
666 stderr_range: observation.stderr_range,
667 stdout_total_bytes: observation.stdout_total_bytes,
668 stderr_total_bytes: observation.stderr_total_bytes,
669 encoding: observation.encoding,
670 exit_code: observation.exit_code,
671 finished_at: observation.finished_at,
672 signal: observation.signal,
673 duration_ms: observation.duration_ms,
674 compression,
675 },
676 );
677 response.print();
678 Ok(())
679}
680
681#[derive(Debug)]
687pub struct SuperviseOpts<'a> {
688 pub job_id: &'a str,
689 pub root: &'a Path,
690 pub command: &'a [String],
691 pub full_log: Option<&'a str>,
693 pub timeout_ms: u64,
695 pub kill_after_ms: u64,
697 pub cwd: Option<&'a str>,
699 pub env_vars: Vec<String>,
701 pub env_files: Vec<String>,
703 pub inherit_env: bool,
705 pub stdin_file: Option<String>,
707 pub progress_every_ms: u64,
709 pub notify_command: Option<String>,
712 pub notify_file: Option<String>,
714 pub shell_wrapper: Vec<String>,
716}
717
718pub fn observe_inline_output(
725 job_dir: &JobDir,
726 wait: bool,
727 until_seconds: u64,
728 forever: bool,
729 max_bytes: u64,
730) -> Result<InlineObservation> {
731 let started = std::time::Instant::now();
732 let deadline = if wait {
733 if forever {
734 None
735 } else {
736 Some(started + std::time::Duration::from_secs(until_seconds))
737 }
738 } else {
739 Some(started)
740 };
741
742 if wait {
743 loop {
744 let state = job_dir.read_state()?;
745 let has_output = std::fs::metadata(job_dir.stdout_path())
746 .map(|m| m.len() > 0)
747 .unwrap_or(false)
748 || std::fs::metadata(job_dir.stderr_path())
749 .map(|m| m.len() > 0)
750 .unwrap_or(false);
751
752 if !state.status().is_non_terminal() || has_output {
753 break;
754 }
755
756 if let Some(dl) = deadline
757 && std::time::Instant::now() >= dl
758 {
759 break;
760 }
761
762 std::thread::sleep(std::time::Duration::from_millis(100));
763 }
764 }
765
766 let state = job_dir.read_state()?;
767 let stdout = job_dir.read_head_metrics("stdout.log", max_bytes);
768 let stderr = job_dir.read_head_metrics("stderr.log", max_bytes);
769
770 Ok(InlineObservation {
771 waited_ms: if wait {
772 started.elapsed().as_millis() as u64
773 } else {
774 0
775 },
776 stdout: stdout.head,
777 stderr: stderr.head,
778 stdout_range: stdout.range,
779 stderr_range: stderr.range,
780 stdout_total_bytes: stdout.observed_bytes,
781 stderr_total_bytes: stderr.observed_bytes,
782 encoding: "utf-8-lossy".to_string(),
783 state: state.status().as_str().to_string(),
784 exit_code: state.exit_code(),
785 signal: state.signal().map(|s| s.to_string()),
786 duration_ms: state.duration_ms(),
787 finished_at: state.finished_at,
788 })
789}
790
791pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
792 let base = match cwd_override {
793 Some(p) => std::path::PathBuf::from(p),
794 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
795 };
796
797 match base.canonicalize() {
799 Ok(canonical) => canonical.display().to_string(),
800 Err(_) => {
801 if base.is_absolute() {
803 base.display().to_string()
804 } else {
805 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
807 cwd.join(base).display().to_string()
808 }
809 }
810 }
811}
812
813pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
816 if mask_keys.is_empty() {
817 return env_vars.to_vec();
818 }
819 env_vars
820 .iter()
821 .map(|s| {
822 let (key, _val) = parse_env_var(s);
823 if mask_keys.iter().any(|k| k == &key) {
824 format!("{key}=***")
825 } else {
826 s.clone()
827 }
828 })
829 .collect()
830}
831
832fn parse_env_var(s: &str) -> (String, String) {
834 if let Some(pos) = s.find('=') {
835 (s[..pos].to_string(), s[pos + 1..].to_string())
836 } else {
837 (s.to_string(), String::new())
838 }
839}
840
841fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
844 let contents =
845 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
846 let mut vars = Vec::new();
847 for line in contents.lines() {
848 let line = line.trim();
849 if line.is_empty() || line.starts_with('#') {
850 continue;
851 }
852 vars.push(parse_env_var(line));
853 }
854 Ok(vars)
855}
856
857struct OutputMatchChecker {
864 job_dir_path: std::path::PathBuf,
865 shell_wrapper: Vec<String>,
866 inner: std::sync::Mutex<OutputMatchInner>,
867}
868
869struct OutputMatchInner {
870 config: Option<crate::schema::NotificationConfig>,
871}
872
873impl OutputMatchChecker {
874 fn new(
875 job_dir_path: std::path::PathBuf,
876 shell_wrapper: Vec<String>,
877 initial_config: Option<crate::schema::NotificationConfig>,
878 ) -> Self {
879 Self {
880 job_dir_path,
881 shell_wrapper,
882 inner: std::sync::Mutex::new(OutputMatchInner {
883 config: initial_config,
884 }),
885 }
886 }
887
888 fn check_line(&self, line: &str, stream: &str) {
895 use crate::schema::{OutputMatchStream, OutputMatchType};
896
897 let match_info: Option<crate::schema::OutputMatchConfig> = {
899 let mut inner = self.inner.lock().unwrap();
900
901 {
903 let meta_path = self.job_dir_path.join("meta.json");
904 if let Ok(raw) = std::fs::read(&meta_path)
905 && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
906 {
907 inner.config = meta.notification;
908 }
909 }
910
911 let Some(ref notification) = inner.config else {
912 return;
913 };
914 let Some(ref match_cfg) = notification.on_output_match else {
915 return;
916 };
917
918 let stream_matches = match match_cfg.stream {
920 OutputMatchStream::Stdout => stream == "stdout",
921 OutputMatchStream::Stderr => stream == "stderr",
922 OutputMatchStream::Either => true,
923 };
924 if !stream_matches {
925 return;
926 }
927
928 let matched = match &match_cfg.match_type {
930 OutputMatchType::Contains => line.contains(&match_cfg.pattern),
931 OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
932 .map(|re| re.is_match(line))
933 .unwrap_or(false),
934 };
935
936 if matched {
937 Some(match_cfg.clone())
938 } else {
939 None
940 }
941 }; if let Some(match_cfg) = match_info {
944 self.dispatch_match(line, stream, &match_cfg);
945 }
946 }
947
948 fn dispatch_match(
951 &self,
952 line: &str,
953 stream: &str,
954 match_cfg: &crate::schema::OutputMatchConfig,
955 ) {
956 use std::io::Write;
957
958 let job_id = self
959 .job_dir_path
960 .file_name()
961 .and_then(|n| n.to_str())
962 .unwrap_or("unknown");
963
964 let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
965 let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
966 let events_path = self.job_dir_path.join("notification_events.ndjson");
967 let events_path_str = events_path.display().to_string();
968
969 let match_type_str = match &match_cfg.match_type {
970 crate::schema::OutputMatchType::Contains => "contains",
971 crate::schema::OutputMatchType::Regex => "regex",
972 };
973
974 let event = crate::schema::OutputMatchEvent {
975 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
976 event_type: "job.output.matched".to_string(),
977 job_id: job_id.to_string(),
978 pattern: match_cfg.pattern.clone(),
979 match_type: match_type_str.to_string(),
980 stream: stream.to_string(),
981 line: line.to_string(),
982 stdout_log_path,
983 stderr_log_path,
984 };
985
986 let event_json = serde_json::to_string(&event).unwrap_or_default();
987 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
988
989 if let Some(ref cmd) = match_cfg.command {
990 delivery_results.push(dispatch_command_sink(
991 cmd,
992 &event_json,
993 job_id,
994 &events_path_str,
995 &self.shell_wrapper,
996 "job.output.matched",
997 ));
998 }
999 if let Some(ref file_path) = match_cfg.file {
1000 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1001 }
1002
1003 let record = crate::schema::OutputMatchEventRecord {
1005 event,
1006 delivery_results,
1007 };
1008 if let Ok(record_json) = serde_json::to_string(&record)
1009 && let Ok(mut f) = std::fs::OpenOptions::new()
1010 .create(true)
1011 .append(true)
1012 .open(&events_path)
1013 {
1014 let _ = writeln!(f, "{record_json}");
1015 }
1016 }
1017}
1018
1019fn stream_to_logs<R, F>(
1036 stream: R,
1037 log_path: &std::path::Path,
1038 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
1039 label: &str,
1040 on_line: Option<F>,
1041) where
1042 R: std::io::Read,
1043 F: Fn(&str),
1044{
1045 use std::io::Write;
1046 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
1047 let mut stream = stream;
1048 let mut buf = [0u8; 8192];
1049 let mut line_buf: Vec<u8> = Vec::new();
1051 loop {
1052 match stream.read(&mut buf) {
1053 Ok(0) => break, Ok(n) => {
1055 let chunk = &buf[..n];
1056 let _ = log_file.write_all(chunk);
1058 for &b in chunk {
1060 if b == b'\n' {
1061 let line = String::from_utf8_lossy(&line_buf);
1062 if let Ok(mut fl) = full_log.lock() {
1063 let ts = now_rfc3339();
1064 let _ = writeln!(fl, "{ts} [{label}] {line}");
1065 }
1066 if let Some(ref f) = on_line {
1067 f(&line);
1068 }
1069 line_buf.clear();
1070 } else {
1071 line_buf.push(b);
1072 }
1073 }
1074 }
1075 Err(_) => break,
1076 }
1077 }
1078 if !line_buf.is_empty() {
1080 let line = String::from_utf8_lossy(&line_buf);
1081 if let Ok(mut fl) = full_log.lock() {
1082 let ts = now_rfc3339();
1083 let _ = writeln!(fl, "{ts} [{label}] {line}");
1084 }
1085 if let Some(ref f) = on_line {
1086 f(&line);
1087 }
1088 }
1089}
1090
1091pub fn supervise(opts: SuperviseOpts) -> Result<()> {
1101 use std::sync::{Arc, Mutex};
1102
1103 let job_id = opts.job_id;
1104 let root = opts.root;
1105 let command = opts.command;
1106
1107 if command.is_empty() {
1108 anyhow::bail!("supervisor: no command");
1109 }
1110
1111 let job_dir = JobDir::open(root, job_id)?;
1112
1113 let meta = job_dir.read_meta()?;
1115 let started_at = now_rfc3339();
1119
1120 let full_log_path = if let Some(p) = opts.full_log {
1122 std::path::PathBuf::from(p)
1123 } else {
1124 job_dir.full_log_path()
1125 };
1126
1127 if let Some(parent) = full_log_path.parent() {
1130 std::fs::create_dir_all(parent)
1131 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
1132 }
1133 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
1134 let full_log = Arc::new(Mutex::new(full_log_file));
1135
1136 if opts.shell_wrapper.is_empty() {
1152 anyhow::bail!("supervisor: shell wrapper must not be empty");
1153 }
1154 let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
1155 if command.len() == 1 {
1156 child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
1158 } else {
1159 #[cfg(unix)]
1169 {
1170 child_cmd
1173 .args(&opts.shell_wrapper[1..])
1174 .arg("exec \"$@\"")
1175 .arg("--")
1176 .args(command);
1177 }
1178 #[cfg(not(unix))]
1179 {
1180 let joined = command
1184 .iter()
1185 .map(|a| {
1186 if a.contains(' ') {
1187 format!("\"{}\"", a)
1188 } else {
1189 a.clone()
1190 }
1191 })
1192 .collect::<Vec<_>>()
1193 .join(" ");
1194 child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
1195 }
1196 }
1197
1198 if opts.inherit_env {
1199 } else {
1201 child_cmd.env_clear();
1202 }
1203
1204 for env_file in &opts.env_files {
1206 let vars = load_env_file(env_file)?;
1207 for (k, v) in vars {
1208 child_cmd.env(&k, &v);
1209 }
1210 }
1211
1212 for env_var in &opts.env_vars {
1214 let (k, v) = parse_env_var(env_var);
1215 child_cmd.env(&k, &v);
1216 }
1217
1218 if let Some(cwd) = opts.cwd {
1220 child_cmd.current_dir(cwd);
1221 }
1222
1223 #[cfg(unix)]
1228 {
1229 use std::os::unix::process::CommandExt;
1230 unsafe {
1232 child_cmd.pre_exec(|| {
1233 libc::setsid();
1234 Ok(())
1235 });
1236 }
1237 }
1238
1239 let child_stdin = open_child_stdin(&job_dir, opts.stdin_file.as_deref())?;
1241 let mut child = child_cmd
1242 .stdin(child_stdin)
1243 .stdout(std::process::Stdio::piped())
1244 .stderr(std::process::Stdio::piped())
1245 .spawn()
1246 .context("supervisor: spawn child")?;
1247
1248 let pid = child.id();
1249 info!(job_id, pid, "child process started");
1250
1251 #[cfg(windows)]
1258 let windows_job_name = {
1259 match assign_to_job_object(job_id, pid) {
1260 Ok(name) => Some(name),
1261 Err(e) => {
1262 let kill_err = child.kill();
1266 let _ = child.wait(); let failed_state = JobState {
1269 job: JobStateJob {
1270 id: job_id.to_string(),
1271 status: JobStatus::Failed,
1272 started_at: Some(started_at.clone()),
1273 },
1274 result: JobStateResult {
1275 exit_code: None,
1276 signal: None,
1277 duration_ms: None,
1278 },
1279 pid: Some(pid),
1280 finished_at: Some(now_rfc3339()),
1281 updated_at: now_rfc3339(),
1282 windows_job_name: None,
1283 };
1284 let _ = job_dir.write_state(&failed_state);
1287
1288 if opts.notify_command.is_some() || opts.notify_file.is_some() {
1292 let finished_at_ts =
1293 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1294 let stdout_log = job_dir.stdout_path().display().to_string();
1295 let stderr_log = job_dir.stderr_path().display().to_string();
1296 let fail_event = crate::schema::CompletionEvent {
1297 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1298 event_type: "job.finished".to_string(),
1299 job_id: job_id.to_string(),
1300 state: JobStatus::Failed.as_str().to_string(),
1301 command: meta.command.clone(),
1302 cwd: meta.cwd.clone(),
1303 started_at: started_at.clone(),
1304 finished_at: finished_at_ts,
1305 duration_ms: None,
1306 exit_code: None,
1307 signal: None,
1308 stdout_log_path: stdout_log,
1309 stderr_log_path: stderr_log,
1310 };
1311 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1312 let fail_event_path = job_dir.completion_event_path().display().to_string();
1313 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1314 Vec::new();
1315 if let Err(we) = job_dir.write_completion_event_atomic(
1316 &crate::schema::CompletionEventRecord {
1317 event: fail_event.clone(),
1318 delivery_results: vec![],
1319 },
1320 ) {
1321 warn!(
1322 job_id,
1323 error = %we,
1324 "failed to write initial completion_event.json for failed job"
1325 );
1326 }
1327 if let Some(ref shell_cmd) = opts.notify_command {
1328 fail_delivery_results.push(dispatch_command_sink(
1329 shell_cmd,
1330 &fail_event_json,
1331 job_id,
1332 &fail_event_path,
1333 &opts.shell_wrapper,
1334 "job.finished",
1335 ));
1336 }
1337 if let Some(ref file_path) = opts.notify_file {
1338 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1339 }
1340 if let Err(we) = job_dir.write_completion_event_atomic(
1341 &crate::schema::CompletionEventRecord {
1342 event: fail_event,
1343 delivery_results: fail_delivery_results,
1344 },
1345 ) {
1346 warn!(
1347 job_id,
1348 error = %we,
1349 "failed to update completion_event.json with delivery results for failed job"
1350 );
1351 }
1352 }
1353
1354 if let Err(ke) = kill_err {
1355 return Err(anyhow::anyhow!(
1356 "supervisor: failed to assign pid {pid} to Job Object \
1357 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1358 ));
1359 }
1360 return Err(anyhow::anyhow!(
1361 "supervisor: failed to assign pid {pid} to Job Object \
1362 (Windows MUST requirement); child process was killed; \
1363 consider running outside a nested Job Object environment: {e}"
1364 ));
1365 }
1366 }
1367 };
1368 #[cfg(not(windows))]
1369 let windows_job_name: Option<String> = None;
1370
1371 let state = JobState {
1376 job: JobStateJob {
1377 id: job_id.to_string(),
1378 status: JobStatus::Running,
1379 started_at: Some(started_at.clone()),
1380 },
1381 result: JobStateResult {
1382 exit_code: None,
1383 signal: None,
1384 duration_ms: None,
1385 },
1386 pid: Some(pid),
1387 finished_at: None,
1388 updated_at: now_rfc3339(),
1389 windows_job_name,
1390 };
1391 job_dir.write_state(&state)?;
1392
1393 let child_start_time = std::time::Instant::now();
1394
1395 let child_stdout = child.stdout.take().expect("child stdout piped");
1397 let child_stderr = child.stderr.take().expect("child stderr piped");
1398
1399 let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1401 job_dir.path.clone(),
1402 opts.shell_wrapper.clone(),
1403 meta.notification.clone(),
1404 ));
1405
1406 let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1410 let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1411
1412 let stdout_log_path = job_dir.stdout_path();
1414 let full_log_stdout = Arc::clone(&full_log);
1415 let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1416 let t_stdout = std::thread::spawn(move || {
1417 stream_to_logs(
1418 child_stdout,
1419 &stdout_log_path,
1420 full_log_stdout,
1421 "STDOUT",
1422 Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1423 );
1424 let _ = tx_stdout_done.send(());
1425 });
1426
1427 let stderr_log_path = job_dir.stderr_path();
1429 let full_log_stderr = Arc::clone(&full_log);
1430 let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1431 let t_stderr = std::thread::spawn(move || {
1432 stream_to_logs(
1433 child_stderr,
1434 &stderr_log_path,
1435 full_log_stderr,
1436 "STDERR",
1437 Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1438 );
1439 let _ = tx_stderr_done.send(());
1440 });
1441
1442 let timeout_ms = opts.timeout_ms;
1445 let kill_after_ms = opts.kill_after_ms;
1446 let progress_every_ms = opts.progress_every_ms;
1447 let watcher_job_dir = JobDir {
1448 path: job_dir.path.clone(),
1449 job_id: job_id.to_string(),
1450 };
1451 let job_id_str = job_id.to_string();
1452
1453 use std::sync::atomic::{AtomicBool, Ordering};
1455 let child_done = Arc::new(AtomicBool::new(false));
1456
1457 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1458 let child_done_clone = Arc::clone(&child_done);
1459 Some(std::thread::spawn(move || {
1460 let start = std::time::Instant::now();
1461 let timeout_dur = if timeout_ms > 0 {
1462 Some(std::time::Duration::from_millis(timeout_ms))
1463 } else {
1464 None
1465 };
1466 let progress_dur = if progress_every_ms > 0 {
1467 Some(std::time::Duration::from_millis(progress_every_ms))
1468 } else {
1469 None
1470 };
1471
1472 let poll_interval = std::time::Duration::from_millis(100);
1473
1474 loop {
1475 std::thread::sleep(poll_interval);
1476
1477 if child_done_clone.load(Ordering::Relaxed) {
1479 break;
1480 }
1481
1482 let elapsed = start.elapsed();
1483
1484 if let Some(td) = timeout_dur
1486 && elapsed >= td
1487 {
1488 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1489 #[cfg(unix)]
1492 {
1493 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1494 }
1495 if kill_after_ms > 0 {
1497 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1498 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1499 #[cfg(unix)]
1500 {
1501 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1502 }
1503 } else {
1504 #[cfg(unix)]
1506 {
1507 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1508 }
1509 }
1510 break;
1511 }
1512
1513 if let Some(pd) = progress_dur {
1515 let elapsed_ms = elapsed.as_millis() as u64;
1516 let pd_ms = pd.as_millis() as u64;
1517 let poll_ms = poll_interval.as_millis() as u64;
1518 if elapsed_ms % pd_ms < poll_ms {
1519 if let Ok(mut st) = watcher_job_dir.read_state() {
1522 st.updated_at = now_rfc3339();
1523 let _ = watcher_job_dir.write_state(&st);
1524 }
1525 }
1526 }
1527 }
1528 }))
1529 } else {
1530 None
1531 };
1532
1533 let exit_status = child.wait().context("wait for child")?;
1535
1536 child_done.store(true, Ordering::Relaxed);
1538
1539 let duration_ms = child_start_time.elapsed().as_millis() as u64;
1546 let exit_code = exit_status.code();
1547 let finished_at = now_rfc3339();
1548
1549 #[cfg(unix)]
1551 let (terminal_status, signal_name) = {
1552 use std::os::unix::process::ExitStatusExt;
1553 if let Some(sig) = exit_status.signal() {
1554 (JobStatus::Killed, Some(sig.to_string()))
1555 } else {
1556 (JobStatus::Exited, None)
1557 }
1558 };
1559 #[cfg(not(unix))]
1560 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1561
1562 let state = JobState {
1563 job: JobStateJob {
1564 id: job_id.to_string(),
1565 status: terminal_status.clone(),
1566 started_at: Some(started_at.clone()),
1567 },
1568 result: JobStateResult {
1569 exit_code,
1570 signal: signal_name.clone(),
1571 duration_ms: Some(duration_ms),
1572 },
1573 pid: Some(pid),
1574 finished_at: Some(finished_at.clone()),
1575 updated_at: now_rfc3339(),
1576 windows_job_name: None, };
1578 job_dir.write_state(&state)?;
1579 info!(job_id, ?exit_code, "child process finished");
1580
1581 const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1592 let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1593
1594 let remaining = drain_deadline
1595 .checked_duration_since(std::time::Instant::now())
1596 .unwrap_or(std::time::Duration::ZERO);
1597 if rx_stdout_done.recv_timeout(remaining).is_ok() {
1598 let _ = t_stdout.join();
1599 } else {
1600 drop(t_stdout); }
1602
1603 let remaining = drain_deadline
1604 .checked_duration_since(std::time::Instant::now())
1605 .unwrap_or(std::time::Duration::ZERO);
1606 if rx_stderr_done.recv_timeout(remaining).is_ok() {
1607 let _ = t_stderr.join();
1608 } else {
1609 drop(t_stderr); }
1611
1612 if let Some(w) = watcher {
1614 let _ = w.join();
1615 }
1616
1617 let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1620 let (current_notify_command, current_notify_file) = match &latest_notification {
1621 Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1622 None => (None, None),
1623 };
1624
1625 let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1628 if has_notification {
1629 let stdout_log = job_dir.stdout_path().display().to_string();
1630 let stderr_log = job_dir.stderr_path().display().to_string();
1631 let event = crate::schema::CompletionEvent {
1632 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1633 event_type: "job.finished".to_string(),
1634 job_id: job_id.to_string(),
1635 state: terminal_status.as_str().to_string(),
1636 command: meta.command.clone(),
1637 cwd: meta.cwd.clone(),
1638 started_at,
1639 finished_at,
1640 duration_ms: Some(duration_ms),
1641 exit_code,
1642 signal: signal_name,
1643 stdout_log_path: stdout_log,
1644 stderr_log_path: stderr_log,
1645 };
1646
1647 let event_json = serde_json::to_string(&event).unwrap_or_default();
1648 let event_path = job_dir.completion_event_path().display().to_string();
1649 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1650
1651 if let Err(e) =
1653 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1654 event: event.clone(),
1655 delivery_results: vec![],
1656 })
1657 {
1658 warn!(job_id, error = %e, "failed to write initial completion_event.json");
1659 }
1660
1661 if let Some(ref shell_cmd) = current_notify_command {
1662 delivery_results.push(dispatch_command_sink(
1663 shell_cmd,
1664 &event_json,
1665 job_id,
1666 &event_path,
1667 &opts.shell_wrapper,
1668 "job.finished",
1669 ));
1670 }
1671 if let Some(ref file_path) = current_notify_file {
1672 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1673 }
1674
1675 if let Err(e) =
1677 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1678 event,
1679 delivery_results,
1680 })
1681 {
1682 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1683 }
1684 }
1685
1686 Ok(())
1687}
1688
1689fn dispatch_command_sink(
1696 shell_cmd: &str,
1697 event_json: &str,
1698 job_id: &str,
1699 event_path: &str,
1700 shell_wrapper: &[String],
1701 event_type: &str,
1702) -> crate::schema::SinkDeliveryResult {
1703 use std::io::Write;
1704 let attempted_at = now_rfc3339();
1705 let target = shell_cmd.to_string();
1706
1707 if shell_cmd.trim().is_empty() {
1708 return crate::schema::SinkDeliveryResult {
1709 sink_type: "command".to_string(),
1710 target,
1711 success: false,
1712 error: Some("empty shell command".to_string()),
1713 attempted_at,
1714 };
1715 }
1716
1717 if shell_wrapper.is_empty() {
1718 return crate::schema::SinkDeliveryResult {
1719 sink_type: "command".to_string(),
1720 target,
1721 success: false,
1722 error: Some("shell wrapper must not be empty".to_string()),
1723 attempted_at,
1724 };
1725 }
1726
1727 let mut cmd = Command::new(&shell_wrapper[0]);
1728 cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1729
1730 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1731 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1732 cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1733 cmd.stdin(std::process::Stdio::piped());
1734 cmd.stdout(std::process::Stdio::null());
1735 cmd.stderr(std::process::Stdio::null());
1736
1737 match cmd.spawn() {
1738 Ok(mut child) => {
1739 if let Some(mut stdin) = child.stdin.take() {
1740 let _ = stdin.write_all(event_json.as_bytes());
1741 }
1742 match child.wait() {
1743 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1744 sink_type: "command".to_string(),
1745 target,
1746 success: true,
1747 error: None,
1748 attempted_at,
1749 },
1750 Ok(status) => crate::schema::SinkDeliveryResult {
1751 sink_type: "command".to_string(),
1752 target,
1753 success: false,
1754 error: Some(format!("exited with status {status}")),
1755 attempted_at,
1756 },
1757 Err(e) => crate::schema::SinkDeliveryResult {
1758 sink_type: "command".to_string(),
1759 target,
1760 success: false,
1761 error: Some(format!("wait error: {e}")),
1762 attempted_at,
1763 },
1764 }
1765 }
1766 Err(e) => crate::schema::SinkDeliveryResult {
1767 sink_type: "command".to_string(),
1768 target,
1769 success: false,
1770 error: Some(format!("spawn error: {e}")),
1771 attempted_at,
1772 },
1773 }
1774}
1775
1776fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1779 use std::io::Write;
1780 let attempted_at = now_rfc3339();
1781 let path = std::path::Path::new(file_path);
1782
1783 if let Some(parent) = path.parent()
1784 && let Err(e) = std::fs::create_dir_all(parent)
1785 {
1786 return crate::schema::SinkDeliveryResult {
1787 sink_type: "file".to_string(),
1788 target: file_path.to_string(),
1789 success: false,
1790 error: Some(format!("create parent dir: {e}")),
1791 attempted_at,
1792 };
1793 }
1794
1795 match std::fs::OpenOptions::new()
1796 .create(true)
1797 .append(true)
1798 .open(path)
1799 {
1800 Ok(mut f) => match writeln!(f, "{event_json}") {
1801 Ok(_) => crate::schema::SinkDeliveryResult {
1802 sink_type: "file".to_string(),
1803 target: file_path.to_string(),
1804 success: true,
1805 error: None,
1806 attempted_at,
1807 },
1808 Err(e) => crate::schema::SinkDeliveryResult {
1809 sink_type: "file".to_string(),
1810 target: file_path.to_string(),
1811 success: false,
1812 error: Some(format!("write error: {e}")),
1813 attempted_at,
1814 },
1815 },
1816 Err(e) => crate::schema::SinkDeliveryResult {
1817 sink_type: "file".to_string(),
1818 target: file_path.to_string(),
1819 success: false,
1820 error: Some(format!("open error: {e}")),
1821 attempted_at,
1822 },
1823 }
1824}
1825
1826pub fn now_rfc3339_pub() -> String {
1828 now_rfc3339()
1829}
1830
1831fn now_rfc3339() -> String {
1832 let d = std::time::SystemTime::now()
1834 .duration_since(std::time::UNIX_EPOCH)
1835 .unwrap_or_default();
1836 format_rfc3339(d.as_secs())
1837}
1838
1839fn format_rfc3339(secs: u64) -> String {
1840 let mut s = secs;
1842 let seconds = s % 60;
1843 s /= 60;
1844 let minutes = s % 60;
1845 s /= 60;
1846 let hours = s % 24;
1847 s /= 24;
1848
1849 let mut days = s;
1851 let mut year = 1970u64;
1852 loop {
1853 let days_in_year = if is_leap(year) { 366 } else { 365 };
1854 if days < days_in_year {
1855 break;
1856 }
1857 days -= days_in_year;
1858 year += 1;
1859 }
1860
1861 let leap = is_leap(year);
1862 let month_days: [u64; 12] = [
1863 31,
1864 if leap { 29 } else { 28 },
1865 31,
1866 30,
1867 31,
1868 30,
1869 31,
1870 31,
1871 30,
1872 31,
1873 30,
1874 31,
1875 ];
1876 let mut month = 0usize;
1877 for (i, &d) in month_days.iter().enumerate() {
1878 if days < d {
1879 month = i;
1880 break;
1881 }
1882 days -= d;
1883 }
1884 let day = days + 1;
1885
1886 format!(
1887 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1888 year,
1889 month + 1,
1890 day,
1891 hours,
1892 minutes,
1893 seconds
1894 )
1895}
1896
1897fn is_leap(year: u64) -> bool {
1898 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1899}
1900
1901#[cfg(windows)]
1912fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1913 use windows::Win32::Foundation::CloseHandle;
1914 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1915 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1916 use windows::core::HSTRING;
1917
1918 let job_name = format!("AgentExec-{job_id}");
1919 let hname = HSTRING::from(job_name.as_str());
1920
1921 unsafe {
1922 let proc_handle =
1924 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1925 anyhow::anyhow!(
1926 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1927 )
1928 })?;
1929
1930 let job = match CreateJobObjectW(None, &hname) {
1932 Ok(h) => h,
1933 Err(e) => {
1934 let _ = CloseHandle(proc_handle);
1935 return Err(anyhow::anyhow!(
1936 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1937 ));
1938 }
1939 };
1940
1941 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1945 let _ = CloseHandle(job);
1946 let _ = CloseHandle(proc_handle);
1947 return Err(anyhow::anyhow!(
1948 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1949 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1950 ));
1951 }
1952
1953 let _ = CloseHandle(proc_handle);
1958 std::mem::forget(job);
1961 }
1962
1963 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1964 Ok(job_name)
1965}
1966
1967#[cfg(test)]
1968mod tests {
1969 use super::*;
1970
1971 #[test]
1972 fn rfc3339_epoch() {
1973 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1974 }
1975
1976 #[test]
1977 fn rfc3339_known_date() {
1978 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1980 }
1981}