1use anyhow::{Context, Result};
10use std::io::IsTerminal;
11use std::path::Path;
12use std::process::Command;
13use tracing::{debug, info, warn};
14
15use crate::jobstore::{JobDir, generate_job_id, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18};
19
20#[derive(Debug, Clone)]
21pub struct InlineObservation {
22 pub waited_ms: u64,
23 pub stdout: String,
24 pub stderr: String,
25 pub stdout_range: [u64; 2],
26 pub stderr_range: [u64; 2],
27 pub stdout_total_bytes: u64,
28 pub stderr_total_bytes: u64,
29 pub encoding: String,
30 pub state: String,
31 pub exit_code: Option<i32>,
32 pub finished_at: Option<String>,
33 pub signal: Option<String>,
34 pub duration_ms: Option<u64>,
35}
36use crate::tag::dedup_tags;
37
38#[derive(Debug)]
47pub struct RunOpts<'a> {
48 pub command: Vec<String>,
50 pub root: Option<&'a str>,
52 pub no_auto_gc: bool,
54 pub auto_gc_older_than: Option<String>,
56 pub auto_gc_max_jobs: Option<u64>,
58 pub auto_gc_max_bytes: Option<u64>,
60 pub auto_gc_config: crate::gc::AutoGcConfig,
62 pub wait: bool,
64 pub until_seconds: u64,
66 pub forever: bool,
68 pub max_bytes: u64,
70 pub timeout_ms: u64,
72 pub kill_after_ms: u64,
74 pub cwd: Option<&'a str>,
76 pub env_vars: Vec<String>,
78 pub env_files: Vec<String>,
80 pub inherit_env: bool,
82 pub mask: Vec<String>,
84 pub stdin: Option<StdinSource>,
86 pub stdin_max_bytes: u64,
88 pub tags: Vec<String>,
90 pub log: Option<&'a str>,
92 pub progress_every_ms: u64,
94 pub notify_command: Option<String>,
97 pub notify_file: Option<String>,
99 pub output_pattern: Option<String>,
101 pub output_match_type: Option<String>,
103 pub output_stream: Option<String>,
105 pub output_command: Option<String>,
107 pub output_file: Option<String>,
109 pub shell_wrapper: Vec<String>,
112}
113
114impl<'a> Default for RunOpts<'a> {
115 fn default() -> Self {
116 RunOpts {
117 command: vec![],
118 root: None,
119 no_auto_gc: false,
120 auto_gc_older_than: None,
121 auto_gc_max_jobs: None,
122 auto_gc_max_bytes: None,
123 auto_gc_config: crate::gc::AutoGcConfig::default(),
124 wait: true,
125 until_seconds: 10,
126 forever: false,
127 max_bytes: 65536,
128 timeout_ms: 0,
129 kill_after_ms: 0,
130 cwd: None,
131 env_vars: vec![],
132 env_files: vec![],
133 inherit_env: true,
134 mask: vec![],
135 stdin: None,
136 stdin_max_bytes: DEFAULT_STDIN_MAX_BYTES,
137 tags: vec![],
138 log: None,
139 progress_every_ms: 0,
140 notify_command: None,
141 notify_file: None,
142 output_pattern: None,
143 output_match_type: None,
144 output_stream: None,
145 output_command: None,
146 output_file: None,
147 shell_wrapper: crate::config::default_shell_wrapper(),
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
156pub enum StdinSource {
157 CallerStdin,
158 Inline(String),
159 File(String),
160}
161
162pub struct SpawnSupervisorParams {
163 pub job_id: String,
164 pub root: std::path::PathBuf,
165 pub full_log_path: String,
166 pub timeout_ms: u64,
167 pub kill_after_ms: u64,
168 pub cwd: Option<String>,
169 pub env_vars: Vec<String>,
171 pub env_files: Vec<String>,
172 pub inherit_env: bool,
173 pub stdin_file: Option<String>,
174 pub progress_every_ms: u64,
175 pub notify_command: Option<String>,
176 pub notify_file: Option<String>,
177 pub shell_wrapper: Vec<String>,
178 pub command: Vec<String>,
179}
180
181pub fn resolve_stdin_source(
182 stdin: Option<String>,
183 stdin_file: Option<String>,
184) -> Option<StdinSource> {
185 if let Some(value) = stdin {
186 if value == "-" {
187 Some(StdinSource::CallerStdin)
188 } else {
189 Some(StdinSource::Inline(value))
190 }
191 } else {
192 stdin_file.map(StdinSource::File)
193 }
194}
195
196pub fn validate_stdin_source(stdin: Option<&StdinSource>) -> Result<()> {
197 if matches!(stdin, Some(StdinSource::CallerStdin)) {
198 let stdin = std::io::stdin();
199 if stdin.is_terminal() {
200 return Err(anyhow::anyhow!(StdinRequired(
201 "stdin_required: --stdin - requires non-interactive stdin (pipe/heredoc/redirect)"
202 .to_string(),
203 )));
204 }
205 }
206 Ok(())
207}
208
209fn create_stdin_file(path: &std::path::Path) -> Result<std::fs::File> {
210 #[cfg(unix)]
211 {
212 use std::os::unix::fs::OpenOptionsExt;
213 std::fs::OpenOptions::new()
214 .write(true)
215 .create(true)
216 .truncate(true)
217 .mode(0o600)
218 .open(path)
219 .with_context(|| format!("create materialized stdin {}", path.display()))
220 }
221 #[cfg(not(unix))]
222 {
223 std::fs::File::create(path)
224 .with_context(|| format!("create materialized stdin {}", path.display()))
225 }
226}
227
228struct LimitedWriter<W> {
229 inner: W,
230 written: u64,
231 limit: u64,
232}
233
234impl<W: std::io::Write> std::io::Write for LimitedWriter<W> {
235 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
236 if self.written + buf.len() as u64 > self.limit {
237 return Err(std::io::Error::other("stdin_too_large"));
238 }
239 let n = self.inner.write(buf)?;
240 self.written += n as u64;
241 Ok(n)
242 }
243
244 fn flush(&mut self) -> std::io::Result<()> {
245 self.inner.flush()
246 }
247}
248
249fn materialize_stdin(
250 job_dir: &JobDir,
251 stdin: Option<&StdinSource>,
252 max_bytes: u64,
253) -> Result<Option<String>> {
254 let Some(source) = stdin else {
255 return Ok(None);
256 };
257
258 let target_name = "stdin.bin".to_string();
259 let target_path = job_dir.path.join(&target_name);
260 let file = create_stdin_file(&target_path)?;
261 let mut target = LimitedWriter {
262 inner: file,
263 written: 0,
264 limit: max_bytes,
265 };
266
267 let copy_result = match source {
268 StdinSource::CallerStdin => {
269 let mut stdin = std::io::stdin();
270 std::io::copy(&mut stdin, &mut target).context("materialize caller stdin to stdin.bin")
271 }
272 StdinSource::Inline(value) => {
273 use std::io::Write;
274 target
275 .write_all(value.as_bytes())
276 .map(|_| 0u64)
277 .context("write inline stdin to stdin.bin")
278 }
279 StdinSource::File(path) => {
280 let mut input = std::fs::File::open(path)
281 .with_context(|| format!("open --stdin-file source {}", path))?;
282 std::io::copy(&mut input, &mut target)
283 .with_context(|| format!("copy --stdin-file source {} to stdin.bin", path))
284 }
285 };
286
287 if let Err(e) = copy_result {
288 let _ = std::fs::remove_file(&target_path);
289 let is_too_large = e
290 .chain()
291 .any(|cause| cause.to_string().contains("stdin_too_large"));
292 if is_too_large {
293 return Err(anyhow::anyhow!(StdinTooLarge(format!(
294 "stdin_too_large: input exceeds {} byte limit",
295 max_bytes
296 ))));
297 }
298 return Err(e);
299 }
300
301 Ok(Some(target_name))
302}
303
304fn resolve_stdin_path(job_dir: &JobDir, stdin_file: Option<&str>) -> Option<std::path::PathBuf> {
305 stdin_file.map(|p| {
306 let path = std::path::Path::new(p);
307 if path.is_absolute() {
308 path.to_path_buf()
309 } else {
310 job_dir.path.join(path)
311 }
312 })
313}
314
315#[derive(Debug)]
316pub struct StdinRequired(pub String);
317
318impl std::fmt::Display for StdinRequired {
319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320 write!(f, "{}", self.0)
321 }
322}
323
324impl std::error::Error for StdinRequired {}
325
326#[derive(Debug)]
327pub struct StdinTooLarge(pub String);
328
329impl std::fmt::Display for StdinTooLarge {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 write!(f, "{}", self.0)
332 }
333}
334
335impl std::error::Error for StdinTooLarge {}
336
337pub fn open_child_stdin(job_dir: &JobDir, stdin_file: Option<&str>) -> Result<std::process::Stdio> {
338 if let Some(path) = resolve_stdin_path(job_dir, stdin_file) {
339 let file = std::fs::File::open(&path)
340 .with_context(|| format!("open materialized stdin {}", path.display()))?;
341 Ok(std::process::Stdio::from(file))
342 } else {
343 Ok(std::process::Stdio::null())
344 }
345}
346
347pub const DEFAULT_STDIN_MAX_BYTES: u64 = 64 * 1024 * 1024; pub fn materialize_stdin_for_job(
350 job_dir: &JobDir,
351 stdin: Option<&StdinSource>,
352 max_bytes: u64,
353) -> Result<Option<String>> {
354 materialize_stdin(job_dir, stdin, max_bytes)
355}
356
357pub fn spawn_supervisor_process(
362 job_dir: &JobDir,
363 params: SpawnSupervisorParams,
364) -> Result<(u32, String)> {
365 let started_at = now_rfc3339();
366
367 let exe = std::env::current_exe().context("resolve current exe")?;
368 let mut supervisor_cmd = Command::new(&exe);
369 supervisor_cmd
370 .arg("_supervise")
371 .arg("--job-id")
372 .arg(¶ms.job_id)
373 .arg("--supervise-root")
374 .arg(params.root.display().to_string())
375 .arg("--full-log")
376 .arg(¶ms.full_log_path);
377
378 if params.timeout_ms > 0 {
379 let timeout_seconds = params.timeout_ms.saturating_add(999) / 1000;
380 supervisor_cmd
381 .arg("--timeout")
382 .arg(timeout_seconds.to_string());
383 }
384 if params.kill_after_ms > 0 {
385 let kill_after_seconds = params.kill_after_ms.saturating_add(999) / 1000;
386 supervisor_cmd
387 .arg("--kill-after")
388 .arg(kill_after_seconds.to_string());
389 }
390 if let Some(ref cwd) = params.cwd {
391 supervisor_cmd.arg("--cwd").arg(cwd);
392 }
393 for env_file in ¶ms.env_files {
394 supervisor_cmd.arg("--env-file").arg(env_file);
395 }
396 for env_var in ¶ms.env_vars {
397 supervisor_cmd.arg("--env").arg(env_var);
398 }
399 if !params.inherit_env {
400 supervisor_cmd.arg("--no-inherit-env");
401 }
402 if let Some(ref stdin_file) = params.stdin_file {
403 supervisor_cmd.arg("--stdin-file").arg(stdin_file);
404 }
405 if params.progress_every_ms > 0 {
406 let progress_every_seconds = params.progress_every_ms.saturating_add(999) / 1000;
407 supervisor_cmd
408 .arg("--progress-every")
409 .arg(progress_every_seconds.to_string());
410 }
411 if let Some(ref nc) = params.notify_command {
412 supervisor_cmd.arg("--notify-command").arg(nc);
413 }
414 if let Some(ref nf) = params.notify_file {
415 supervisor_cmd.arg("--notify-file").arg(nf);
416 }
417 let wrapper_json =
418 serde_json::to_string(¶ms.shell_wrapper).context("serialize shell wrapper")?;
419 supervisor_cmd
420 .arg("--shell-wrapper-resolved")
421 .arg(&wrapper_json);
422
423 supervisor_cmd
424 .arg("--")
425 .args(¶ms.command)
426 .stdin(std::process::Stdio::null())
427 .stdout(std::process::Stdio::null())
428 .stderr(std::process::Stdio::null());
429
430 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
431 let supervisor_pid = supervisor.id();
432 debug!(supervisor_pid, "supervisor spawned");
433
434 job_dir.init_state(supervisor_pid, &started_at)?;
436
437 #[cfg(windows)]
439 {
440 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
441 loop {
442 std::thread::sleep(std::time::Duration::from_millis(10));
443 if let Ok(current_state) = job_dir.read_state() {
444 let supervisor_updated = current_state
445 .pid
446 .map(|p| p != supervisor_pid)
447 .unwrap_or(false)
448 || *current_state.status() == crate::schema::JobStatus::Failed;
449 if supervisor_updated {
450 if *current_state.status() == crate::schema::JobStatus::Failed {
451 anyhow::bail!(
452 "supervisor failed to assign child process to Job Object \
453 (Windows MUST requirement); see stderr for details"
454 );
455 }
456 debug!("supervisor confirmed Job Object assignment via state.json handshake");
457 break;
458 }
459 }
460 if std::time::Instant::now() >= handshake_deadline {
461 debug!("supervisor handshake timed out; proceeding with initial state");
462 break;
463 }
464 }
465 }
466
467 Ok((supervisor_pid, started_at))
468}
469
470pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
473 for log_path in [
474 job_dir.stdout_path(),
475 job_dir.stderr_path(),
476 job_dir.full_log_path(),
477 ] {
478 std::fs::OpenOptions::new()
479 .create(true)
480 .append(true)
481 .open(&log_path)
482 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
483 }
484 Ok(())
485}
486
487pub fn execute(opts: RunOpts) -> Result<()> {
489 if opts.command.is_empty() {
490 anyhow::bail!("no command specified for run");
491 }
492
493 let elapsed_start = std::time::Instant::now();
494
495 let root = resolve_root(opts.root);
496 std::fs::create_dir_all(&root)
497 .with_context(|| format!("create jobs root {}", root.display()))?;
498
499 let job_id = generate_job_id(&root)?;
500 let created_at = now_rfc3339();
501
502 let env_keys: Vec<String> = opts
504 .env_vars
505 .iter()
506 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
507 .collect();
508
509 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
511
512 let effective_cwd = resolve_effective_cwd(opts.cwd);
516
517 let on_output_match = crate::notify::build_output_match_config(
519 opts.output_pattern,
520 opts.output_match_type,
521 opts.output_stream,
522 opts.output_command,
523 opts.output_file,
524 None,
525 );
526
527 let notification =
528 if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
529 {
530 Some(crate::schema::NotificationConfig {
531 notify_command: opts.notify_command.clone(),
532 notify_file: opts.notify_file.clone(),
533 on_output_match,
534 })
535 } else {
536 None
537 };
538
539 let tags = dedup_tags(opts.tags)?;
541
542 let meta = JobMeta {
543 job: JobMetaJob { id: job_id.clone() },
544 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
545 command: opts.command.clone(),
546 created_at: created_at.clone(),
547 root: root.display().to_string(),
548 env_keys,
549 env_vars: masked_env_vars.clone(),
550 env_vars_runtime: vec![],
553 mask: opts.mask.clone(),
554 cwd: Some(effective_cwd),
555 notification,
556 inherit_env: opts.inherit_env,
558 env_files: opts.env_files.clone(),
559 timeout_ms: opts.timeout_ms,
560 kill_after_ms: opts.kill_after_ms,
561 progress_every_ms: opts.progress_every_ms,
562 shell_wrapper: Some(opts.shell_wrapper.clone()),
563 stdin_file: None,
564 tags: tags.clone(),
565 };
566
567 validate_stdin_source(opts.stdin.as_ref())?;
568
569 let job_dir = JobDir::create(&root, &job_id, &meta)?;
570 let stdin_file =
571 materialize_stdin_for_job(&job_dir, opts.stdin.as_ref(), opts.stdin_max_bytes)?;
572 if stdin_file.is_some() {
573 let mut meta_with_stdin = meta.clone();
574 meta_with_stdin.stdin_file = stdin_file.clone();
575 job_dir.write_meta_atomic(&meta_with_stdin)?;
576 }
577 info!(job_id = %job_id, "created job directory");
578
579 let full_log_path = if let Some(log) = opts.log {
581 log.to_string()
582 } else {
583 job_dir.full_log_path().display().to_string()
584 };
585
586 pre_create_log_files(&job_dir)?;
588
589 let (_supervisor_pid, _started_at) = spawn_supervisor_process(
593 &job_dir,
594 SpawnSupervisorParams {
595 job_id: job_id.clone(),
596 root: root.clone(),
597 full_log_path: full_log_path.clone(),
598 timeout_ms: opts.timeout_ms,
599 kill_after_ms: opts.kill_after_ms,
600 cwd: opts.cwd.map(|s| s.to_string()),
601 env_vars: opts.env_vars.clone(),
602 env_files: opts.env_files.clone(),
603 inherit_env: opts.inherit_env,
604 stdin_file: stdin_file.clone(),
605 progress_every_ms: opts.progress_every_ms,
606 notify_command: opts.notify_command.clone(),
607 notify_file: opts.notify_file.clone(),
608 shell_wrapper: opts.shell_wrapper.clone(),
609 command: opts.command.clone(),
610 },
611 )?;
612
613 let stdout_log_path = job_dir.stdout_path().display().to_string();
615 let stderr_log_path = job_dir.stderr_path().display().to_string();
616
617 let observation = observe_inline_output(
618 &job_dir,
619 opts.wait,
620 opts.until_seconds,
621 opts.forever,
622 opts.max_bytes,
623 )?;
624 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
625
626 if !opts.no_auto_gc {
627 let mut auto_cfg = opts.auto_gc_config.clone();
628 if let Some(v) = opts.auto_gc_older_than {
629 auto_cfg.older_than = v;
630 }
631 if let Some(v) = opts.auto_gc_max_jobs {
632 auto_cfg.max_jobs = usize::try_from(v).ok();
633 }
634 if let Some(v) = opts.auto_gc_max_bytes {
635 auto_cfg.max_bytes = Some(v);
636 }
637 crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
638 }
639
640 let response = Response::new(
641 "run",
642 RunData {
643 job_id,
644 state: observation.state,
645 tags,
646 env_vars: masked_env_vars,
649 stdout_log_path,
650 stderr_log_path,
651 elapsed_ms,
652 waited_ms: observation.waited_ms,
653 stdout: observation.stdout,
654 stderr: observation.stderr,
655 stdout_range: observation.stdout_range,
656 stderr_range: observation.stderr_range,
657 stdout_total_bytes: observation.stdout_total_bytes,
658 stderr_total_bytes: observation.stderr_total_bytes,
659 encoding: observation.encoding,
660 exit_code: observation.exit_code,
661 finished_at: observation.finished_at,
662 signal: observation.signal,
663 duration_ms: observation.duration_ms,
664 },
665 );
666 response.print();
667 Ok(())
668}
669
670#[derive(Debug)]
676pub struct SuperviseOpts<'a> {
677 pub job_id: &'a str,
678 pub root: &'a Path,
679 pub command: &'a [String],
680 pub full_log: Option<&'a str>,
682 pub timeout_ms: u64,
684 pub kill_after_ms: u64,
686 pub cwd: Option<&'a str>,
688 pub env_vars: Vec<String>,
690 pub env_files: Vec<String>,
692 pub inherit_env: bool,
694 pub stdin_file: Option<String>,
696 pub progress_every_ms: u64,
698 pub notify_command: Option<String>,
701 pub notify_file: Option<String>,
703 pub shell_wrapper: Vec<String>,
705}
706
707pub fn observe_inline_output(
714 job_dir: &JobDir,
715 wait: bool,
716 until_seconds: u64,
717 forever: bool,
718 max_bytes: u64,
719) -> Result<InlineObservation> {
720 let started = std::time::Instant::now();
721 let deadline = if wait {
722 if forever {
723 None
724 } else {
725 Some(started + std::time::Duration::from_secs(until_seconds))
726 }
727 } else {
728 Some(started)
729 };
730
731 if wait {
732 loop {
733 let state = job_dir.read_state()?;
734 let has_output = std::fs::metadata(job_dir.stdout_path())
735 .map(|m| m.len() > 0)
736 .unwrap_or(false)
737 || std::fs::metadata(job_dir.stderr_path())
738 .map(|m| m.len() > 0)
739 .unwrap_or(false);
740
741 if !state.status().is_non_terminal() || has_output {
742 break;
743 }
744
745 if let Some(dl) = deadline
746 && std::time::Instant::now() >= dl
747 {
748 break;
749 }
750
751 std::thread::sleep(std::time::Duration::from_millis(100));
752 }
753 }
754
755 let state = job_dir.read_state()?;
756 let stdout = job_dir.read_head_metrics("stdout.log", max_bytes);
757 let stderr = job_dir.read_head_metrics("stderr.log", max_bytes);
758
759 Ok(InlineObservation {
760 waited_ms: if wait {
761 started.elapsed().as_millis() as u64
762 } else {
763 0
764 },
765 stdout: stdout.head,
766 stderr: stderr.head,
767 stdout_range: stdout.range,
768 stderr_range: stderr.range,
769 stdout_total_bytes: stdout.observed_bytes,
770 stderr_total_bytes: stderr.observed_bytes,
771 encoding: "utf-8-lossy".to_string(),
772 state: state.status().as_str().to_string(),
773 exit_code: state.exit_code(),
774 signal: state.signal().map(|s| s.to_string()),
775 duration_ms: state.duration_ms(),
776 finished_at: state.finished_at,
777 })
778}
779
780pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
781 let base = match cwd_override {
782 Some(p) => std::path::PathBuf::from(p),
783 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
784 };
785
786 match base.canonicalize() {
788 Ok(canonical) => canonical.display().to_string(),
789 Err(_) => {
790 if base.is_absolute() {
792 base.display().to_string()
793 } else {
794 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
796 cwd.join(base).display().to_string()
797 }
798 }
799 }
800}
801
802pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
805 if mask_keys.is_empty() {
806 return env_vars.to_vec();
807 }
808 env_vars
809 .iter()
810 .map(|s| {
811 let (key, _val) = parse_env_var(s);
812 if mask_keys.iter().any(|k| k == &key) {
813 format!("{key}=***")
814 } else {
815 s.clone()
816 }
817 })
818 .collect()
819}
820
821fn parse_env_var(s: &str) -> (String, String) {
823 if let Some(pos) = s.find('=') {
824 (s[..pos].to_string(), s[pos + 1..].to_string())
825 } else {
826 (s.to_string(), String::new())
827 }
828}
829
830fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
833 let contents =
834 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
835 let mut vars = Vec::new();
836 for line in contents.lines() {
837 let line = line.trim();
838 if line.is_empty() || line.starts_with('#') {
839 continue;
840 }
841 vars.push(parse_env_var(line));
842 }
843 Ok(vars)
844}
845
846struct OutputMatchChecker {
853 job_dir_path: std::path::PathBuf,
854 shell_wrapper: Vec<String>,
855 inner: std::sync::Mutex<OutputMatchInner>,
856}
857
858struct OutputMatchInner {
859 config: Option<crate::schema::NotificationConfig>,
860}
861
862impl OutputMatchChecker {
863 fn new(
864 job_dir_path: std::path::PathBuf,
865 shell_wrapper: Vec<String>,
866 initial_config: Option<crate::schema::NotificationConfig>,
867 ) -> Self {
868 Self {
869 job_dir_path,
870 shell_wrapper,
871 inner: std::sync::Mutex::new(OutputMatchInner {
872 config: initial_config,
873 }),
874 }
875 }
876
877 fn check_line(&self, line: &str, stream: &str) {
884 use crate::schema::{OutputMatchStream, OutputMatchType};
885
886 let match_info: Option<crate::schema::OutputMatchConfig> = {
888 let mut inner = self.inner.lock().unwrap();
889
890 {
892 let meta_path = self.job_dir_path.join("meta.json");
893 if let Ok(raw) = std::fs::read(&meta_path)
894 && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
895 {
896 inner.config = meta.notification;
897 }
898 }
899
900 let Some(ref notification) = inner.config else {
901 return;
902 };
903 let Some(ref match_cfg) = notification.on_output_match else {
904 return;
905 };
906
907 let stream_matches = match match_cfg.stream {
909 OutputMatchStream::Stdout => stream == "stdout",
910 OutputMatchStream::Stderr => stream == "stderr",
911 OutputMatchStream::Either => true,
912 };
913 if !stream_matches {
914 return;
915 }
916
917 let matched = match &match_cfg.match_type {
919 OutputMatchType::Contains => line.contains(&match_cfg.pattern),
920 OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
921 .map(|re| re.is_match(line))
922 .unwrap_or(false),
923 };
924
925 if matched {
926 Some(match_cfg.clone())
927 } else {
928 None
929 }
930 }; if let Some(match_cfg) = match_info {
933 self.dispatch_match(line, stream, &match_cfg);
934 }
935 }
936
937 fn dispatch_match(
940 &self,
941 line: &str,
942 stream: &str,
943 match_cfg: &crate::schema::OutputMatchConfig,
944 ) {
945 use std::io::Write;
946
947 let job_id = self
948 .job_dir_path
949 .file_name()
950 .and_then(|n| n.to_str())
951 .unwrap_or("unknown");
952
953 let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
954 let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
955 let events_path = self.job_dir_path.join("notification_events.ndjson");
956 let events_path_str = events_path.display().to_string();
957
958 let match_type_str = match &match_cfg.match_type {
959 crate::schema::OutputMatchType::Contains => "contains",
960 crate::schema::OutputMatchType::Regex => "regex",
961 };
962
963 let event = crate::schema::OutputMatchEvent {
964 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
965 event_type: "job.output.matched".to_string(),
966 job_id: job_id.to_string(),
967 pattern: match_cfg.pattern.clone(),
968 match_type: match_type_str.to_string(),
969 stream: stream.to_string(),
970 line: line.to_string(),
971 stdout_log_path,
972 stderr_log_path,
973 };
974
975 let event_json = serde_json::to_string(&event).unwrap_or_default();
976 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
977
978 if let Some(ref cmd) = match_cfg.command {
979 delivery_results.push(dispatch_command_sink(
980 cmd,
981 &event_json,
982 job_id,
983 &events_path_str,
984 &self.shell_wrapper,
985 "job.output.matched",
986 ));
987 }
988 if let Some(ref file_path) = match_cfg.file {
989 delivery_results.push(dispatch_file_sink(file_path, &event_json));
990 }
991
992 let record = crate::schema::OutputMatchEventRecord {
994 event,
995 delivery_results,
996 };
997 if let Ok(record_json) = serde_json::to_string(&record)
998 && let Ok(mut f) = std::fs::OpenOptions::new()
999 .create(true)
1000 .append(true)
1001 .open(&events_path)
1002 {
1003 let _ = writeln!(f, "{record_json}");
1004 }
1005 }
1006}
1007
1008fn stream_to_logs<R, F>(
1025 stream: R,
1026 log_path: &std::path::Path,
1027 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
1028 label: &str,
1029 on_line: Option<F>,
1030) where
1031 R: std::io::Read,
1032 F: Fn(&str),
1033{
1034 use std::io::Write;
1035 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
1036 let mut stream = stream;
1037 let mut buf = [0u8; 8192];
1038 let mut line_buf: Vec<u8> = Vec::new();
1040 loop {
1041 match stream.read(&mut buf) {
1042 Ok(0) => break, Ok(n) => {
1044 let chunk = &buf[..n];
1045 let _ = log_file.write_all(chunk);
1047 for &b in chunk {
1049 if b == b'\n' {
1050 let line = String::from_utf8_lossy(&line_buf);
1051 if let Ok(mut fl) = full_log.lock() {
1052 let ts = now_rfc3339();
1053 let _ = writeln!(fl, "{ts} [{label}] {line}");
1054 }
1055 if let Some(ref f) = on_line {
1056 f(&line);
1057 }
1058 line_buf.clear();
1059 } else {
1060 line_buf.push(b);
1061 }
1062 }
1063 }
1064 Err(_) => break,
1065 }
1066 }
1067 if !line_buf.is_empty() {
1069 let line = String::from_utf8_lossy(&line_buf);
1070 if let Ok(mut fl) = full_log.lock() {
1071 let ts = now_rfc3339();
1072 let _ = writeln!(fl, "{ts} [{label}] {line}");
1073 }
1074 if let Some(ref f) = on_line {
1075 f(&line);
1076 }
1077 }
1078}
1079
1080pub fn supervise(opts: SuperviseOpts) -> Result<()> {
1090 use std::sync::{Arc, Mutex};
1091
1092 let job_id = opts.job_id;
1093 let root = opts.root;
1094 let command = opts.command;
1095
1096 if command.is_empty() {
1097 anyhow::bail!("supervisor: no command");
1098 }
1099
1100 let job_dir = JobDir::open(root, job_id)?;
1101
1102 let meta = job_dir.read_meta()?;
1104 let started_at = now_rfc3339();
1108
1109 let full_log_path = if let Some(p) = opts.full_log {
1111 std::path::PathBuf::from(p)
1112 } else {
1113 job_dir.full_log_path()
1114 };
1115
1116 if let Some(parent) = full_log_path.parent() {
1119 std::fs::create_dir_all(parent)
1120 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
1121 }
1122 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
1123 let full_log = Arc::new(Mutex::new(full_log_file));
1124
1125 if opts.shell_wrapper.is_empty() {
1141 anyhow::bail!("supervisor: shell wrapper must not be empty");
1142 }
1143 let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
1144 if command.len() == 1 {
1145 child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
1147 } else {
1148 #[cfg(unix)]
1158 {
1159 child_cmd
1162 .args(&opts.shell_wrapper[1..])
1163 .arg("exec \"$@\"")
1164 .arg("--")
1165 .args(command);
1166 }
1167 #[cfg(not(unix))]
1168 {
1169 let joined = command
1173 .iter()
1174 .map(|a| {
1175 if a.contains(' ') {
1176 format!("\"{}\"", a)
1177 } else {
1178 a.clone()
1179 }
1180 })
1181 .collect::<Vec<_>>()
1182 .join(" ");
1183 child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
1184 }
1185 }
1186
1187 if opts.inherit_env {
1188 } else {
1190 child_cmd.env_clear();
1191 }
1192
1193 for env_file in &opts.env_files {
1195 let vars = load_env_file(env_file)?;
1196 for (k, v) in vars {
1197 child_cmd.env(&k, &v);
1198 }
1199 }
1200
1201 for env_var in &opts.env_vars {
1203 let (k, v) = parse_env_var(env_var);
1204 child_cmd.env(&k, &v);
1205 }
1206
1207 if let Some(cwd) = opts.cwd {
1209 child_cmd.current_dir(cwd);
1210 }
1211
1212 #[cfg(unix)]
1217 {
1218 use std::os::unix::process::CommandExt;
1219 unsafe {
1221 child_cmd.pre_exec(|| {
1222 libc::setsid();
1223 Ok(())
1224 });
1225 }
1226 }
1227
1228 let child_stdin = open_child_stdin(&job_dir, opts.stdin_file.as_deref())?;
1230 let mut child = child_cmd
1231 .stdin(child_stdin)
1232 .stdout(std::process::Stdio::piped())
1233 .stderr(std::process::Stdio::piped())
1234 .spawn()
1235 .context("supervisor: spawn child")?;
1236
1237 let pid = child.id();
1238 info!(job_id, pid, "child process started");
1239
1240 #[cfg(windows)]
1247 let windows_job_name = {
1248 match assign_to_job_object(job_id, pid) {
1249 Ok(name) => Some(name),
1250 Err(e) => {
1251 let kill_err = child.kill();
1255 let _ = child.wait(); let failed_state = JobState {
1258 job: JobStateJob {
1259 id: job_id.to_string(),
1260 status: JobStatus::Failed,
1261 started_at: Some(started_at.clone()),
1262 },
1263 result: JobStateResult {
1264 exit_code: None,
1265 signal: None,
1266 duration_ms: None,
1267 },
1268 pid: Some(pid),
1269 finished_at: Some(now_rfc3339()),
1270 updated_at: now_rfc3339(),
1271 windows_job_name: None,
1272 };
1273 let _ = job_dir.write_state(&failed_state);
1276
1277 if opts.notify_command.is_some() || opts.notify_file.is_some() {
1281 let finished_at_ts =
1282 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1283 let stdout_log = job_dir.stdout_path().display().to_string();
1284 let stderr_log = job_dir.stderr_path().display().to_string();
1285 let fail_event = crate::schema::CompletionEvent {
1286 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1287 event_type: "job.finished".to_string(),
1288 job_id: job_id.to_string(),
1289 state: JobStatus::Failed.as_str().to_string(),
1290 command: meta.command.clone(),
1291 cwd: meta.cwd.clone(),
1292 started_at: started_at.clone(),
1293 finished_at: finished_at_ts,
1294 duration_ms: None,
1295 exit_code: None,
1296 signal: None,
1297 stdout_log_path: stdout_log,
1298 stderr_log_path: stderr_log,
1299 };
1300 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1301 let fail_event_path = job_dir.completion_event_path().display().to_string();
1302 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1303 Vec::new();
1304 if let Err(we) = job_dir.write_completion_event_atomic(
1305 &crate::schema::CompletionEventRecord {
1306 event: fail_event.clone(),
1307 delivery_results: vec![],
1308 },
1309 ) {
1310 warn!(
1311 job_id,
1312 error = %we,
1313 "failed to write initial completion_event.json for failed job"
1314 );
1315 }
1316 if let Some(ref shell_cmd) = opts.notify_command {
1317 fail_delivery_results.push(dispatch_command_sink(
1318 shell_cmd,
1319 &fail_event_json,
1320 job_id,
1321 &fail_event_path,
1322 &opts.shell_wrapper,
1323 "job.finished",
1324 ));
1325 }
1326 if let Some(ref file_path) = opts.notify_file {
1327 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1328 }
1329 if let Err(we) = job_dir.write_completion_event_atomic(
1330 &crate::schema::CompletionEventRecord {
1331 event: fail_event,
1332 delivery_results: fail_delivery_results,
1333 },
1334 ) {
1335 warn!(
1336 job_id,
1337 error = %we,
1338 "failed to update completion_event.json with delivery results for failed job"
1339 );
1340 }
1341 }
1342
1343 if let Err(ke) = kill_err {
1344 return Err(anyhow::anyhow!(
1345 "supervisor: failed to assign pid {pid} to Job Object \
1346 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1347 ));
1348 }
1349 return Err(anyhow::anyhow!(
1350 "supervisor: failed to assign pid {pid} to Job Object \
1351 (Windows MUST requirement); child process was killed; \
1352 consider running outside a nested Job Object environment: {e}"
1353 ));
1354 }
1355 }
1356 };
1357 #[cfg(not(windows))]
1358 let windows_job_name: Option<String> = None;
1359
1360 let state = JobState {
1365 job: JobStateJob {
1366 id: job_id.to_string(),
1367 status: JobStatus::Running,
1368 started_at: Some(started_at.clone()),
1369 },
1370 result: JobStateResult {
1371 exit_code: None,
1372 signal: None,
1373 duration_ms: None,
1374 },
1375 pid: Some(pid),
1376 finished_at: None,
1377 updated_at: now_rfc3339(),
1378 windows_job_name,
1379 };
1380 job_dir.write_state(&state)?;
1381
1382 let child_start_time = std::time::Instant::now();
1383
1384 let child_stdout = child.stdout.take().expect("child stdout piped");
1386 let child_stderr = child.stderr.take().expect("child stderr piped");
1387
1388 let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1390 job_dir.path.clone(),
1391 opts.shell_wrapper.clone(),
1392 meta.notification.clone(),
1393 ));
1394
1395 let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1399 let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1400
1401 let stdout_log_path = job_dir.stdout_path();
1403 let full_log_stdout = Arc::clone(&full_log);
1404 let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1405 let t_stdout = std::thread::spawn(move || {
1406 stream_to_logs(
1407 child_stdout,
1408 &stdout_log_path,
1409 full_log_stdout,
1410 "STDOUT",
1411 Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1412 );
1413 let _ = tx_stdout_done.send(());
1414 });
1415
1416 let stderr_log_path = job_dir.stderr_path();
1418 let full_log_stderr = Arc::clone(&full_log);
1419 let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1420 let t_stderr = std::thread::spawn(move || {
1421 stream_to_logs(
1422 child_stderr,
1423 &stderr_log_path,
1424 full_log_stderr,
1425 "STDERR",
1426 Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1427 );
1428 let _ = tx_stderr_done.send(());
1429 });
1430
1431 let timeout_ms = opts.timeout_ms;
1434 let kill_after_ms = opts.kill_after_ms;
1435 let progress_every_ms = opts.progress_every_ms;
1436 let state_path = job_dir.state_path();
1437 let job_id_str = job_id.to_string();
1438
1439 use std::sync::atomic::{AtomicBool, Ordering};
1441 let child_done = Arc::new(AtomicBool::new(false));
1442
1443 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1444 let state_path_clone = state_path.clone();
1445 let child_done_clone = Arc::clone(&child_done);
1446 Some(std::thread::spawn(move || {
1447 let start = std::time::Instant::now();
1448 let timeout_dur = if timeout_ms > 0 {
1449 Some(std::time::Duration::from_millis(timeout_ms))
1450 } else {
1451 None
1452 };
1453 let progress_dur = if progress_every_ms > 0 {
1454 Some(std::time::Duration::from_millis(progress_every_ms))
1455 } else {
1456 None
1457 };
1458
1459 let poll_interval = std::time::Duration::from_millis(100);
1460
1461 loop {
1462 std::thread::sleep(poll_interval);
1463
1464 if child_done_clone.load(Ordering::Relaxed) {
1466 break;
1467 }
1468
1469 let elapsed = start.elapsed();
1470
1471 if let Some(td) = timeout_dur
1473 && elapsed >= td
1474 {
1475 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1476 #[cfg(unix)]
1479 {
1480 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1481 }
1482 if kill_after_ms > 0 {
1484 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1485 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1486 #[cfg(unix)]
1487 {
1488 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1489 }
1490 } else {
1491 #[cfg(unix)]
1493 {
1494 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1495 }
1496 }
1497 break;
1498 }
1499
1500 if let Some(pd) = progress_dur {
1502 let elapsed_ms = elapsed.as_millis() as u64;
1503 let pd_ms = pd.as_millis() as u64;
1504 let poll_ms = poll_interval.as_millis() as u64;
1505 if elapsed_ms % pd_ms < poll_ms {
1506 if let Ok(raw) = std::fs::read(&state_path_clone)
1508 && let Ok(mut st) =
1509 serde_json::from_slice::<crate::schema::JobState>(&raw)
1510 {
1511 st.updated_at = now_rfc3339();
1512 if let Ok(s) = serde_json::to_string_pretty(&st) {
1513 let _ = std::fs::write(&state_path_clone, s);
1514 }
1515 }
1516 }
1517 }
1518 }
1519 }))
1520 } else {
1521 None
1522 };
1523
1524 let exit_status = child.wait().context("wait for child")?;
1526
1527 child_done.store(true, Ordering::Relaxed);
1529
1530 let duration_ms = child_start_time.elapsed().as_millis() as u64;
1537 let exit_code = exit_status.code();
1538 let finished_at = now_rfc3339();
1539
1540 #[cfg(unix)]
1542 let (terminal_status, signal_name) = {
1543 use std::os::unix::process::ExitStatusExt;
1544 if let Some(sig) = exit_status.signal() {
1545 (JobStatus::Killed, Some(sig.to_string()))
1546 } else {
1547 (JobStatus::Exited, None)
1548 }
1549 };
1550 #[cfg(not(unix))]
1551 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1552
1553 let state = JobState {
1554 job: JobStateJob {
1555 id: job_id.to_string(),
1556 status: terminal_status.clone(),
1557 started_at: Some(started_at.clone()),
1558 },
1559 result: JobStateResult {
1560 exit_code,
1561 signal: signal_name.clone(),
1562 duration_ms: Some(duration_ms),
1563 },
1564 pid: Some(pid),
1565 finished_at: Some(finished_at.clone()),
1566 updated_at: now_rfc3339(),
1567 windows_job_name: None, };
1569 job_dir.write_state(&state)?;
1570 info!(job_id, ?exit_code, "child process finished");
1571
1572 const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1583 let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1584
1585 let remaining = drain_deadline
1586 .checked_duration_since(std::time::Instant::now())
1587 .unwrap_or(std::time::Duration::ZERO);
1588 if rx_stdout_done.recv_timeout(remaining).is_ok() {
1589 let _ = t_stdout.join();
1590 } else {
1591 drop(t_stdout); }
1593
1594 let remaining = drain_deadline
1595 .checked_duration_since(std::time::Instant::now())
1596 .unwrap_or(std::time::Duration::ZERO);
1597 if rx_stderr_done.recv_timeout(remaining).is_ok() {
1598 let _ = t_stderr.join();
1599 } else {
1600 drop(t_stderr); }
1602
1603 if let Some(w) = watcher {
1605 let _ = w.join();
1606 }
1607
1608 let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1611 let (current_notify_command, current_notify_file) = match &latest_notification {
1612 Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1613 None => (None, None),
1614 };
1615
1616 let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1619 if has_notification {
1620 let stdout_log = job_dir.stdout_path().display().to_string();
1621 let stderr_log = job_dir.stderr_path().display().to_string();
1622 let event = crate::schema::CompletionEvent {
1623 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1624 event_type: "job.finished".to_string(),
1625 job_id: job_id.to_string(),
1626 state: terminal_status.as_str().to_string(),
1627 command: meta.command.clone(),
1628 cwd: meta.cwd.clone(),
1629 started_at,
1630 finished_at,
1631 duration_ms: Some(duration_ms),
1632 exit_code,
1633 signal: signal_name,
1634 stdout_log_path: stdout_log,
1635 stderr_log_path: stderr_log,
1636 };
1637
1638 let event_json = serde_json::to_string(&event).unwrap_or_default();
1639 let event_path = job_dir.completion_event_path().display().to_string();
1640 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1641
1642 if let Err(e) =
1644 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1645 event: event.clone(),
1646 delivery_results: vec![],
1647 })
1648 {
1649 warn!(job_id, error = %e, "failed to write initial completion_event.json");
1650 }
1651
1652 if let Some(ref shell_cmd) = current_notify_command {
1653 delivery_results.push(dispatch_command_sink(
1654 shell_cmd,
1655 &event_json,
1656 job_id,
1657 &event_path,
1658 &opts.shell_wrapper,
1659 "job.finished",
1660 ));
1661 }
1662 if let Some(ref file_path) = current_notify_file {
1663 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1664 }
1665
1666 if let Err(e) =
1668 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1669 event,
1670 delivery_results,
1671 })
1672 {
1673 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1674 }
1675 }
1676
1677 Ok(())
1678}
1679
1680fn dispatch_command_sink(
1687 shell_cmd: &str,
1688 event_json: &str,
1689 job_id: &str,
1690 event_path: &str,
1691 shell_wrapper: &[String],
1692 event_type: &str,
1693) -> crate::schema::SinkDeliveryResult {
1694 use std::io::Write;
1695 let attempted_at = now_rfc3339();
1696 let target = shell_cmd.to_string();
1697
1698 if shell_cmd.trim().is_empty() {
1699 return crate::schema::SinkDeliveryResult {
1700 sink_type: "command".to_string(),
1701 target,
1702 success: false,
1703 error: Some("empty shell command".to_string()),
1704 attempted_at,
1705 };
1706 }
1707
1708 if shell_wrapper.is_empty() {
1709 return crate::schema::SinkDeliveryResult {
1710 sink_type: "command".to_string(),
1711 target,
1712 success: false,
1713 error: Some("shell wrapper must not be empty".to_string()),
1714 attempted_at,
1715 };
1716 }
1717
1718 let mut cmd = Command::new(&shell_wrapper[0]);
1719 cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1720
1721 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1722 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1723 cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1724 cmd.stdin(std::process::Stdio::piped());
1725 cmd.stdout(std::process::Stdio::null());
1726 cmd.stderr(std::process::Stdio::null());
1727
1728 match cmd.spawn() {
1729 Ok(mut child) => {
1730 if let Some(mut stdin) = child.stdin.take() {
1731 let _ = stdin.write_all(event_json.as_bytes());
1732 }
1733 match child.wait() {
1734 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1735 sink_type: "command".to_string(),
1736 target,
1737 success: true,
1738 error: None,
1739 attempted_at,
1740 },
1741 Ok(status) => crate::schema::SinkDeliveryResult {
1742 sink_type: "command".to_string(),
1743 target,
1744 success: false,
1745 error: Some(format!("exited with status {status}")),
1746 attempted_at,
1747 },
1748 Err(e) => crate::schema::SinkDeliveryResult {
1749 sink_type: "command".to_string(),
1750 target,
1751 success: false,
1752 error: Some(format!("wait error: {e}")),
1753 attempted_at,
1754 },
1755 }
1756 }
1757 Err(e) => crate::schema::SinkDeliveryResult {
1758 sink_type: "command".to_string(),
1759 target,
1760 success: false,
1761 error: Some(format!("spawn error: {e}")),
1762 attempted_at,
1763 },
1764 }
1765}
1766
1767fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1770 use std::io::Write;
1771 let attempted_at = now_rfc3339();
1772 let path = std::path::Path::new(file_path);
1773
1774 if let Some(parent) = path.parent()
1775 && let Err(e) = std::fs::create_dir_all(parent)
1776 {
1777 return crate::schema::SinkDeliveryResult {
1778 sink_type: "file".to_string(),
1779 target: file_path.to_string(),
1780 success: false,
1781 error: Some(format!("create parent dir: {e}")),
1782 attempted_at,
1783 };
1784 }
1785
1786 match std::fs::OpenOptions::new()
1787 .create(true)
1788 .append(true)
1789 .open(path)
1790 {
1791 Ok(mut f) => match writeln!(f, "{event_json}") {
1792 Ok(_) => crate::schema::SinkDeliveryResult {
1793 sink_type: "file".to_string(),
1794 target: file_path.to_string(),
1795 success: true,
1796 error: None,
1797 attempted_at,
1798 },
1799 Err(e) => crate::schema::SinkDeliveryResult {
1800 sink_type: "file".to_string(),
1801 target: file_path.to_string(),
1802 success: false,
1803 error: Some(format!("write error: {e}")),
1804 attempted_at,
1805 },
1806 },
1807 Err(e) => crate::schema::SinkDeliveryResult {
1808 sink_type: "file".to_string(),
1809 target: file_path.to_string(),
1810 success: false,
1811 error: Some(format!("open error: {e}")),
1812 attempted_at,
1813 },
1814 }
1815}
1816
1817pub fn now_rfc3339_pub() -> String {
1819 now_rfc3339()
1820}
1821
1822fn now_rfc3339() -> String {
1823 let d = std::time::SystemTime::now()
1825 .duration_since(std::time::UNIX_EPOCH)
1826 .unwrap_or_default();
1827 format_rfc3339(d.as_secs())
1828}
1829
1830fn format_rfc3339(secs: u64) -> String {
1831 let mut s = secs;
1833 let seconds = s % 60;
1834 s /= 60;
1835 let minutes = s % 60;
1836 s /= 60;
1837 let hours = s % 24;
1838 s /= 24;
1839
1840 let mut days = s;
1842 let mut year = 1970u64;
1843 loop {
1844 let days_in_year = if is_leap(year) { 366 } else { 365 };
1845 if days < days_in_year {
1846 break;
1847 }
1848 days -= days_in_year;
1849 year += 1;
1850 }
1851
1852 let leap = is_leap(year);
1853 let month_days: [u64; 12] = [
1854 31,
1855 if leap { 29 } else { 28 },
1856 31,
1857 30,
1858 31,
1859 30,
1860 31,
1861 31,
1862 30,
1863 31,
1864 30,
1865 31,
1866 ];
1867 let mut month = 0usize;
1868 for (i, &d) in month_days.iter().enumerate() {
1869 if days < d {
1870 month = i;
1871 break;
1872 }
1873 days -= d;
1874 }
1875 let day = days + 1;
1876
1877 format!(
1878 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1879 year,
1880 month + 1,
1881 day,
1882 hours,
1883 minutes,
1884 seconds
1885 )
1886}
1887
1888fn is_leap(year: u64) -> bool {
1889 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1890}
1891
1892#[cfg(windows)]
1903fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1904 use windows::Win32::Foundation::CloseHandle;
1905 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1906 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1907 use windows::core::HSTRING;
1908
1909 let job_name = format!("AgentExec-{job_id}");
1910 let hname = HSTRING::from(job_name.as_str());
1911
1912 unsafe {
1913 let proc_handle =
1915 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1916 anyhow::anyhow!(
1917 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1918 )
1919 })?;
1920
1921 let job = match CreateJobObjectW(None, &hname) {
1923 Ok(h) => h,
1924 Err(e) => {
1925 let _ = CloseHandle(proc_handle);
1926 return Err(anyhow::anyhow!(
1927 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1928 ));
1929 }
1930 };
1931
1932 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1936 let _ = CloseHandle(job);
1937 let _ = CloseHandle(proc_handle);
1938 return Err(anyhow::anyhow!(
1939 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1940 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1941 ));
1942 }
1943
1944 let _ = CloseHandle(proc_handle);
1949 std::mem::forget(job);
1952 }
1953
1954 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1955 Ok(job_name)
1956}
1957
1958#[cfg(test)]
1959mod tests {
1960 use super::*;
1961
1962 #[test]
1963 fn rfc3339_epoch() {
1964 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1965 }
1966
1967 #[test]
1968 fn rfc3339_known_date() {
1969 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1971 }
1972}