1use anyhow::{Context, Result};
10use std::io::IsTerminal;
11use std::path::Path;
12use std::process::Command;
13use tracing::{debug, info, warn};
14
15use crate::jobstore::{JobDir, generate_job_id, resolve_root};
16use crate::schema::{
17 JobMeta, JobMetaJob, JobState, JobStateJob, JobStateResult, JobStatus, Response, RunData,
18};
19
20#[derive(Debug, Clone)]
21pub struct InlineObservation {
22 pub waited_ms: u64,
23 pub stdout: String,
24 pub stderr: String,
25 pub stdout_range: [u64; 2],
26 pub stderr_range: [u64; 2],
27 pub stdout_total_bytes: u64,
28 pub stderr_total_bytes: u64,
29 pub encoding: String,
30 pub state: String,
31 pub exit_code: Option<i32>,
32 pub finished_at: Option<String>,
33 pub signal: Option<String>,
34 pub duration_ms: Option<u64>,
35}
36use crate::tag::dedup_tags;
37
38#[derive(Debug)]
47pub struct RunOpts<'a> {
48 pub command: Vec<String>,
50 pub root: Option<&'a str>,
52 pub wait: bool,
54 pub until_seconds: u64,
56 pub forever: bool,
58 pub max_bytes: u64,
60 pub timeout_ms: u64,
62 pub kill_after_ms: u64,
64 pub cwd: Option<&'a str>,
66 pub env_vars: Vec<String>,
68 pub env_files: Vec<String>,
70 pub inherit_env: bool,
72 pub mask: Vec<String>,
74 pub stdin: Option<StdinSource>,
76 pub stdin_max_bytes: u64,
78 pub tags: Vec<String>,
80 pub log: Option<&'a str>,
82 pub progress_every_ms: u64,
84 pub notify_command: Option<String>,
87 pub notify_file: Option<String>,
89 pub output_pattern: Option<String>,
91 pub output_match_type: Option<String>,
93 pub output_stream: Option<String>,
95 pub output_command: Option<String>,
97 pub output_file: Option<String>,
99 pub shell_wrapper: Vec<String>,
102}
103
104impl<'a> Default for RunOpts<'a> {
105 fn default() -> Self {
106 RunOpts {
107 command: vec![],
108 root: None,
109 wait: true,
110 until_seconds: 10,
111 forever: false,
112 max_bytes: 65536,
113 timeout_ms: 0,
114 kill_after_ms: 0,
115 cwd: None,
116 env_vars: vec![],
117 env_files: vec![],
118 inherit_env: true,
119 mask: vec![],
120 stdin: None,
121 stdin_max_bytes: DEFAULT_STDIN_MAX_BYTES,
122 tags: vec![],
123 log: None,
124 progress_every_ms: 0,
125 notify_command: None,
126 notify_file: None,
127 output_pattern: None,
128 output_match_type: None,
129 output_stream: None,
130 output_command: None,
131 output_file: None,
132 shell_wrapper: crate::config::default_shell_wrapper(),
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
141pub enum StdinSource {
142 CallerStdin,
143 Inline(String),
144 File(String),
145}
146
147pub struct SpawnSupervisorParams {
148 pub job_id: String,
149 pub root: std::path::PathBuf,
150 pub full_log_path: String,
151 pub timeout_ms: u64,
152 pub kill_after_ms: u64,
153 pub cwd: Option<String>,
154 pub env_vars: Vec<String>,
156 pub env_files: Vec<String>,
157 pub inherit_env: bool,
158 pub stdin_file: Option<String>,
159 pub progress_every_ms: u64,
160 pub notify_command: Option<String>,
161 pub notify_file: Option<String>,
162 pub shell_wrapper: Vec<String>,
163 pub command: Vec<String>,
164}
165
166pub fn resolve_stdin_source(
167 stdin: Option<String>,
168 stdin_file: Option<String>,
169) -> Option<StdinSource> {
170 if let Some(value) = stdin {
171 if value == "-" {
172 Some(StdinSource::CallerStdin)
173 } else {
174 Some(StdinSource::Inline(value))
175 }
176 } else {
177 stdin_file.map(StdinSource::File)
178 }
179}
180
181pub fn validate_stdin_source(stdin: Option<&StdinSource>) -> Result<()> {
182 if matches!(stdin, Some(StdinSource::CallerStdin)) {
183 let stdin = std::io::stdin();
184 if stdin.is_terminal() {
185 return Err(anyhow::anyhow!(StdinRequired(
186 "stdin_required: --stdin - requires non-interactive stdin (pipe/heredoc/redirect)"
187 .to_string(),
188 )));
189 }
190 }
191 Ok(())
192}
193
194fn create_stdin_file(path: &std::path::Path) -> Result<std::fs::File> {
195 #[cfg(unix)]
196 {
197 use std::os::unix::fs::OpenOptionsExt;
198 std::fs::OpenOptions::new()
199 .write(true)
200 .create(true)
201 .truncate(true)
202 .mode(0o600)
203 .open(path)
204 .with_context(|| format!("create materialized stdin {}", path.display()))
205 }
206 #[cfg(not(unix))]
207 {
208 std::fs::File::create(path)
209 .with_context(|| format!("create materialized stdin {}", path.display()))
210 }
211}
212
213struct LimitedWriter<W> {
214 inner: W,
215 written: u64,
216 limit: u64,
217}
218
219impl<W: std::io::Write> std::io::Write for LimitedWriter<W> {
220 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
221 if self.written + buf.len() as u64 > self.limit {
222 return Err(std::io::Error::other("stdin_too_large"));
223 }
224 let n = self.inner.write(buf)?;
225 self.written += n as u64;
226 Ok(n)
227 }
228
229 fn flush(&mut self) -> std::io::Result<()> {
230 self.inner.flush()
231 }
232}
233
234fn materialize_stdin(
235 job_dir: &JobDir,
236 stdin: Option<&StdinSource>,
237 max_bytes: u64,
238) -> Result<Option<String>> {
239 let Some(source) = stdin else {
240 return Ok(None);
241 };
242
243 let target_name = "stdin.bin".to_string();
244 let target_path = job_dir.path.join(&target_name);
245 let file = create_stdin_file(&target_path)?;
246 let mut target = LimitedWriter {
247 inner: file,
248 written: 0,
249 limit: max_bytes,
250 };
251
252 let copy_result = match source {
253 StdinSource::CallerStdin => {
254 let mut stdin = std::io::stdin();
255 std::io::copy(&mut stdin, &mut target).context("materialize caller stdin to stdin.bin")
256 }
257 StdinSource::Inline(value) => {
258 use std::io::Write;
259 target
260 .write_all(value.as_bytes())
261 .map(|_| 0u64)
262 .context("write inline stdin to stdin.bin")
263 }
264 StdinSource::File(path) => {
265 let mut input = std::fs::File::open(path)
266 .with_context(|| format!("open --stdin-file source {}", path))?;
267 std::io::copy(&mut input, &mut target)
268 .with_context(|| format!("copy --stdin-file source {} to stdin.bin", path))
269 }
270 };
271
272 if let Err(e) = copy_result {
273 let _ = std::fs::remove_file(&target_path);
274 let is_too_large = e
275 .chain()
276 .any(|cause| cause.to_string().contains("stdin_too_large"));
277 if is_too_large {
278 return Err(anyhow::anyhow!(StdinTooLarge(format!(
279 "stdin_too_large: input exceeds {} byte limit",
280 max_bytes
281 ))));
282 }
283 return Err(e);
284 }
285
286 Ok(Some(target_name))
287}
288
289fn resolve_stdin_path(job_dir: &JobDir, stdin_file: Option<&str>) -> Option<std::path::PathBuf> {
290 stdin_file.map(|p| {
291 let path = std::path::Path::new(p);
292 if path.is_absolute() {
293 path.to_path_buf()
294 } else {
295 job_dir.path.join(path)
296 }
297 })
298}
299
300#[derive(Debug)]
301pub struct StdinRequired(pub String);
302
303impl std::fmt::Display for StdinRequired {
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 write!(f, "{}", self.0)
306 }
307}
308
309impl std::error::Error for StdinRequired {}
310
311#[derive(Debug)]
312pub struct StdinTooLarge(pub String);
313
314impl std::fmt::Display for StdinTooLarge {
315 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
316 write!(f, "{}", self.0)
317 }
318}
319
320impl std::error::Error for StdinTooLarge {}
321
322pub fn open_child_stdin(job_dir: &JobDir, stdin_file: Option<&str>) -> Result<std::process::Stdio> {
323 if let Some(path) = resolve_stdin_path(job_dir, stdin_file) {
324 let file = std::fs::File::open(&path)
325 .with_context(|| format!("open materialized stdin {}", path.display()))?;
326 Ok(std::process::Stdio::from(file))
327 } else {
328 Ok(std::process::Stdio::null())
329 }
330}
331
332pub const DEFAULT_STDIN_MAX_BYTES: u64 = 64 * 1024 * 1024; pub fn materialize_stdin_for_job(
335 job_dir: &JobDir,
336 stdin: Option<&StdinSource>,
337 max_bytes: u64,
338) -> Result<Option<String>> {
339 materialize_stdin(job_dir, stdin, max_bytes)
340}
341
342pub fn spawn_supervisor_process(
347 job_dir: &JobDir,
348 params: SpawnSupervisorParams,
349) -> Result<(u32, String)> {
350 let started_at = now_rfc3339();
351
352 let exe = std::env::current_exe().context("resolve current exe")?;
353 let mut supervisor_cmd = Command::new(&exe);
354 supervisor_cmd
355 .arg("_supervise")
356 .arg("--job-id")
357 .arg(¶ms.job_id)
358 .arg("--supervise-root")
359 .arg(params.root.display().to_string())
360 .arg("--full-log")
361 .arg(¶ms.full_log_path);
362
363 if params.timeout_ms > 0 {
364 let timeout_seconds = params.timeout_ms.saturating_add(999) / 1000;
365 supervisor_cmd
366 .arg("--timeout")
367 .arg(timeout_seconds.to_string());
368 }
369 if params.kill_after_ms > 0 {
370 let kill_after_seconds = params.kill_after_ms.saturating_add(999) / 1000;
371 supervisor_cmd
372 .arg("--kill-after")
373 .arg(kill_after_seconds.to_string());
374 }
375 if let Some(ref cwd) = params.cwd {
376 supervisor_cmd.arg("--cwd").arg(cwd);
377 }
378 for env_file in ¶ms.env_files {
379 supervisor_cmd.arg("--env-file").arg(env_file);
380 }
381 for env_var in ¶ms.env_vars {
382 supervisor_cmd.arg("--env").arg(env_var);
383 }
384 if !params.inherit_env {
385 supervisor_cmd.arg("--no-inherit-env");
386 }
387 if let Some(ref stdin_file) = params.stdin_file {
388 supervisor_cmd.arg("--stdin-file").arg(stdin_file);
389 }
390 if params.progress_every_ms > 0 {
391 let progress_every_seconds = params.progress_every_ms.saturating_add(999) / 1000;
392 supervisor_cmd
393 .arg("--progress-every")
394 .arg(progress_every_seconds.to_string());
395 }
396 if let Some(ref nc) = params.notify_command {
397 supervisor_cmd.arg("--notify-command").arg(nc);
398 }
399 if let Some(ref nf) = params.notify_file {
400 supervisor_cmd.arg("--notify-file").arg(nf);
401 }
402 let wrapper_json =
403 serde_json::to_string(¶ms.shell_wrapper).context("serialize shell wrapper")?;
404 supervisor_cmd
405 .arg("--shell-wrapper-resolved")
406 .arg(&wrapper_json);
407
408 supervisor_cmd
409 .arg("--")
410 .args(¶ms.command)
411 .stdin(std::process::Stdio::null())
412 .stdout(std::process::Stdio::null())
413 .stderr(std::process::Stdio::null());
414
415 let supervisor = supervisor_cmd.spawn().context("spawn supervisor")?;
416 let supervisor_pid = supervisor.id();
417 debug!(supervisor_pid, "supervisor spawned");
418
419 job_dir.init_state(supervisor_pid, &started_at)?;
421
422 #[cfg(windows)]
424 {
425 let handshake_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
426 loop {
427 std::thread::sleep(std::time::Duration::from_millis(10));
428 if let Ok(current_state) = job_dir.read_state() {
429 let supervisor_updated = current_state
430 .pid
431 .map(|p| p != supervisor_pid)
432 .unwrap_or(false)
433 || *current_state.status() == crate::schema::JobStatus::Failed;
434 if supervisor_updated {
435 if *current_state.status() == crate::schema::JobStatus::Failed {
436 anyhow::bail!(
437 "supervisor failed to assign child process to Job Object \
438 (Windows MUST requirement); see stderr for details"
439 );
440 }
441 debug!("supervisor confirmed Job Object assignment via state.json handshake");
442 break;
443 }
444 }
445 if std::time::Instant::now() >= handshake_deadline {
446 debug!("supervisor handshake timed out; proceeding with initial state");
447 break;
448 }
449 }
450 }
451
452 Ok((supervisor_pid, started_at))
453}
454
455pub fn pre_create_log_files(job_dir: &JobDir) -> Result<()> {
458 for log_path in [
459 job_dir.stdout_path(),
460 job_dir.stderr_path(),
461 job_dir.full_log_path(),
462 ] {
463 std::fs::OpenOptions::new()
464 .create(true)
465 .append(true)
466 .open(&log_path)
467 .with_context(|| format!("pre-create log file {}", log_path.display()))?;
468 }
469 Ok(())
470}
471
472pub fn execute(opts: RunOpts) -> Result<()> {
474 if opts.command.is_empty() {
475 anyhow::bail!("no command specified for run");
476 }
477
478 let elapsed_start = std::time::Instant::now();
479
480 let root = resolve_root(opts.root);
481 std::fs::create_dir_all(&root)
482 .with_context(|| format!("create jobs root {}", root.display()))?;
483
484 let job_id = generate_job_id(&root)?;
485 let created_at = now_rfc3339();
486
487 let env_keys: Vec<String> = opts
489 .env_vars
490 .iter()
491 .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
492 .collect();
493
494 let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
496
497 let effective_cwd = resolve_effective_cwd(opts.cwd);
501
502 let on_output_match = crate::notify::build_output_match_config(
504 opts.output_pattern,
505 opts.output_match_type,
506 opts.output_stream,
507 opts.output_command,
508 opts.output_file,
509 None,
510 );
511
512 let notification =
513 if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
514 {
515 Some(crate::schema::NotificationConfig {
516 notify_command: opts.notify_command.clone(),
517 notify_file: opts.notify_file.clone(),
518 on_output_match,
519 })
520 } else {
521 None
522 };
523
524 let tags = dedup_tags(opts.tags)?;
526
527 let meta = JobMeta {
528 job: JobMetaJob { id: job_id.clone() },
529 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
530 command: opts.command.clone(),
531 created_at: created_at.clone(),
532 root: root.display().to_string(),
533 env_keys,
534 env_vars: masked_env_vars.clone(),
535 env_vars_runtime: vec![],
538 mask: opts.mask.clone(),
539 cwd: Some(effective_cwd),
540 notification,
541 inherit_env: opts.inherit_env,
543 env_files: opts.env_files.clone(),
544 timeout_ms: opts.timeout_ms,
545 kill_after_ms: opts.kill_after_ms,
546 progress_every_ms: opts.progress_every_ms,
547 shell_wrapper: Some(opts.shell_wrapper.clone()),
548 stdin_file: None,
549 tags: tags.clone(),
550 };
551
552 validate_stdin_source(opts.stdin.as_ref())?;
553
554 let job_dir = JobDir::create(&root, &job_id, &meta)?;
555 let stdin_file =
556 materialize_stdin_for_job(&job_dir, opts.stdin.as_ref(), opts.stdin_max_bytes)?;
557 if stdin_file.is_some() {
558 let mut meta_with_stdin = meta.clone();
559 meta_with_stdin.stdin_file = stdin_file.clone();
560 job_dir.write_meta_atomic(&meta_with_stdin)?;
561 }
562 info!(job_id = %job_id, "created job directory");
563
564 let full_log_path = if let Some(log) = opts.log {
566 log.to_string()
567 } else {
568 job_dir.full_log_path().display().to_string()
569 };
570
571 pre_create_log_files(&job_dir)?;
573
574 let (_supervisor_pid, _started_at) = spawn_supervisor_process(
578 &job_dir,
579 SpawnSupervisorParams {
580 job_id: job_id.clone(),
581 root: root.clone(),
582 full_log_path: full_log_path.clone(),
583 timeout_ms: opts.timeout_ms,
584 kill_after_ms: opts.kill_after_ms,
585 cwd: opts.cwd.map(|s| s.to_string()),
586 env_vars: opts.env_vars.clone(),
587 env_files: opts.env_files.clone(),
588 inherit_env: opts.inherit_env,
589 stdin_file: stdin_file.clone(),
590 progress_every_ms: opts.progress_every_ms,
591 notify_command: opts.notify_command.clone(),
592 notify_file: opts.notify_file.clone(),
593 shell_wrapper: opts.shell_wrapper.clone(),
594 command: opts.command.clone(),
595 },
596 )?;
597
598 let stdout_log_path = job_dir.stdout_path().display().to_string();
600 let stderr_log_path = job_dir.stderr_path().display().to_string();
601
602 let observation = observe_inline_output(
603 &job_dir,
604 opts.wait,
605 opts.until_seconds,
606 opts.forever,
607 opts.max_bytes,
608 )?;
609 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
610
611 let response = Response::new(
612 "run",
613 RunData {
614 job_id,
615 state: observation.state,
616 tags,
617 env_vars: masked_env_vars,
620 stdout_log_path,
621 stderr_log_path,
622 elapsed_ms,
623 waited_ms: observation.waited_ms,
624 stdout: observation.stdout,
625 stderr: observation.stderr,
626 stdout_range: observation.stdout_range,
627 stderr_range: observation.stderr_range,
628 stdout_total_bytes: observation.stdout_total_bytes,
629 stderr_total_bytes: observation.stderr_total_bytes,
630 encoding: observation.encoding,
631 exit_code: observation.exit_code,
632 finished_at: observation.finished_at,
633 signal: observation.signal,
634 duration_ms: observation.duration_ms,
635 },
636 );
637 response.print();
638 Ok(())
639}
640
641#[derive(Debug)]
647pub struct SuperviseOpts<'a> {
648 pub job_id: &'a str,
649 pub root: &'a Path,
650 pub command: &'a [String],
651 pub full_log: Option<&'a str>,
653 pub timeout_ms: u64,
655 pub kill_after_ms: u64,
657 pub cwd: Option<&'a str>,
659 pub env_vars: Vec<String>,
661 pub env_files: Vec<String>,
663 pub inherit_env: bool,
665 pub stdin_file: Option<String>,
667 pub progress_every_ms: u64,
669 pub notify_command: Option<String>,
672 pub notify_file: Option<String>,
674 pub shell_wrapper: Vec<String>,
676}
677
678pub fn observe_inline_output(
685 job_dir: &JobDir,
686 wait: bool,
687 until_seconds: u64,
688 forever: bool,
689 max_bytes: u64,
690) -> Result<InlineObservation> {
691 let started = std::time::Instant::now();
692 let deadline = if wait {
693 if forever {
694 None
695 } else {
696 Some(started + std::time::Duration::from_secs(until_seconds))
697 }
698 } else {
699 Some(started)
700 };
701
702 if wait {
703 loop {
704 let state = job_dir.read_state()?;
705 let has_output = std::fs::metadata(job_dir.stdout_path())
706 .map(|m| m.len() > 0)
707 .unwrap_or(false)
708 || std::fs::metadata(job_dir.stderr_path())
709 .map(|m| m.len() > 0)
710 .unwrap_or(false);
711
712 if !state.status().is_non_terminal() || has_output {
713 break;
714 }
715
716 if let Some(dl) = deadline
717 && std::time::Instant::now() >= dl
718 {
719 break;
720 }
721
722 std::thread::sleep(std::time::Duration::from_millis(100));
723 }
724 }
725
726 let state = job_dir.read_state()?;
727 let stdout = job_dir.read_head_metrics("stdout.log", max_bytes);
728 let stderr = job_dir.read_head_metrics("stderr.log", max_bytes);
729
730 Ok(InlineObservation {
731 waited_ms: started.elapsed().as_millis() as u64,
732 stdout: stdout.head,
733 stderr: stderr.head,
734 stdout_range: stdout.range,
735 stderr_range: stderr.range,
736 stdout_total_bytes: stdout.observed_bytes,
737 stderr_total_bytes: stderr.observed_bytes,
738 encoding: "utf-8-lossy".to_string(),
739 state: state.status().as_str().to_string(),
740 exit_code: state.exit_code(),
741 signal: state.signal().map(|s| s.to_string()),
742 duration_ms: state.duration_ms(),
743 finished_at: state.finished_at,
744 })
745}
746
747pub fn resolve_effective_cwd(cwd_override: Option<&str>) -> String {
748 let base = match cwd_override {
749 Some(p) => std::path::PathBuf::from(p),
750 None => std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")),
751 };
752
753 match base.canonicalize() {
755 Ok(canonical) => canonical.display().to_string(),
756 Err(_) => {
757 if base.is_absolute() {
759 base.display().to_string()
760 } else {
761 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
763 cwd.join(base).display().to_string()
764 }
765 }
766 }
767}
768
769pub fn mask_env_vars(env_vars: &[String], mask_keys: &[String]) -> Vec<String> {
772 if mask_keys.is_empty() {
773 return env_vars.to_vec();
774 }
775 env_vars
776 .iter()
777 .map(|s| {
778 let (key, _val) = parse_env_var(s);
779 if mask_keys.iter().any(|k| k == &key) {
780 format!("{key}=***")
781 } else {
782 s.clone()
783 }
784 })
785 .collect()
786}
787
788fn parse_env_var(s: &str) -> (String, String) {
790 if let Some(pos) = s.find('=') {
791 (s[..pos].to_string(), s[pos + 1..].to_string())
792 } else {
793 (s.to_string(), String::new())
794 }
795}
796
797fn load_env_file(path: &str) -> Result<Vec<(String, String)>> {
800 let contents =
801 std::fs::read_to_string(path).with_context(|| format!("read env-file {path}"))?;
802 let mut vars = Vec::new();
803 for line in contents.lines() {
804 let line = line.trim();
805 if line.is_empty() || line.starts_with('#') {
806 continue;
807 }
808 vars.push(parse_env_var(line));
809 }
810 Ok(vars)
811}
812
813struct OutputMatchChecker {
820 job_dir_path: std::path::PathBuf,
821 shell_wrapper: Vec<String>,
822 inner: std::sync::Mutex<OutputMatchInner>,
823}
824
825struct OutputMatchInner {
826 config: Option<crate::schema::NotificationConfig>,
827}
828
829impl OutputMatchChecker {
830 fn new(
831 job_dir_path: std::path::PathBuf,
832 shell_wrapper: Vec<String>,
833 initial_config: Option<crate::schema::NotificationConfig>,
834 ) -> Self {
835 Self {
836 job_dir_path,
837 shell_wrapper,
838 inner: std::sync::Mutex::new(OutputMatchInner {
839 config: initial_config,
840 }),
841 }
842 }
843
844 fn check_line(&self, line: &str, stream: &str) {
851 use crate::schema::{OutputMatchStream, OutputMatchType};
852
853 let match_info: Option<crate::schema::OutputMatchConfig> = {
855 let mut inner = self.inner.lock().unwrap();
856
857 {
859 let meta_path = self.job_dir_path.join("meta.json");
860 if let Ok(raw) = std::fs::read(&meta_path)
861 && let Ok(meta) = serde_json::from_slice::<crate::schema::JobMeta>(&raw)
862 {
863 inner.config = meta.notification;
864 }
865 }
866
867 let Some(ref notification) = inner.config else {
868 return;
869 };
870 let Some(ref match_cfg) = notification.on_output_match else {
871 return;
872 };
873
874 let stream_matches = match match_cfg.stream {
876 OutputMatchStream::Stdout => stream == "stdout",
877 OutputMatchStream::Stderr => stream == "stderr",
878 OutputMatchStream::Either => true,
879 };
880 if !stream_matches {
881 return;
882 }
883
884 let matched = match &match_cfg.match_type {
886 OutputMatchType::Contains => line.contains(&match_cfg.pattern),
887 OutputMatchType::Regex => regex::Regex::new(&match_cfg.pattern)
888 .map(|re| re.is_match(line))
889 .unwrap_or(false),
890 };
891
892 if matched {
893 Some(match_cfg.clone())
894 } else {
895 None
896 }
897 }; if let Some(match_cfg) = match_info {
900 self.dispatch_match(line, stream, &match_cfg);
901 }
902 }
903
904 fn dispatch_match(
907 &self,
908 line: &str,
909 stream: &str,
910 match_cfg: &crate::schema::OutputMatchConfig,
911 ) {
912 use std::io::Write;
913
914 let job_id = self
915 .job_dir_path
916 .file_name()
917 .and_then(|n| n.to_str())
918 .unwrap_or("unknown");
919
920 let stdout_log_path = self.job_dir_path.join("stdout.log").display().to_string();
921 let stderr_log_path = self.job_dir_path.join("stderr.log").display().to_string();
922 let events_path = self.job_dir_path.join("notification_events.ndjson");
923 let events_path_str = events_path.display().to_string();
924
925 let match_type_str = match &match_cfg.match_type {
926 crate::schema::OutputMatchType::Contains => "contains",
927 crate::schema::OutputMatchType::Regex => "regex",
928 };
929
930 let event = crate::schema::OutputMatchEvent {
931 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
932 event_type: "job.output.matched".to_string(),
933 job_id: job_id.to_string(),
934 pattern: match_cfg.pattern.clone(),
935 match_type: match_type_str.to_string(),
936 stream: stream.to_string(),
937 line: line.to_string(),
938 stdout_log_path,
939 stderr_log_path,
940 };
941
942 let event_json = serde_json::to_string(&event).unwrap_or_default();
943 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
944
945 if let Some(ref cmd) = match_cfg.command {
946 delivery_results.push(dispatch_command_sink(
947 cmd,
948 &event_json,
949 job_id,
950 &events_path_str,
951 &self.shell_wrapper,
952 "job.output.matched",
953 ));
954 }
955 if let Some(ref file_path) = match_cfg.file {
956 delivery_results.push(dispatch_file_sink(file_path, &event_json));
957 }
958
959 let record = crate::schema::OutputMatchEventRecord {
961 event,
962 delivery_results,
963 };
964 if let Ok(record_json) = serde_json::to_string(&record)
965 && let Ok(mut f) = std::fs::OpenOptions::new()
966 .create(true)
967 .append(true)
968 .open(&events_path)
969 {
970 let _ = writeln!(f, "{record_json}");
971 }
972 }
973}
974
975fn stream_to_logs<R, F>(
992 stream: R,
993 log_path: &std::path::Path,
994 full_log: std::sync::Arc<std::sync::Mutex<std::fs::File>>,
995 label: &str,
996 on_line: Option<F>,
997) where
998 R: std::io::Read,
999 F: Fn(&str),
1000{
1001 use std::io::Write;
1002 let mut log_file = std::fs::File::create(log_path).expect("create stream log file in thread");
1003 let mut stream = stream;
1004 let mut buf = [0u8; 8192];
1005 let mut line_buf: Vec<u8> = Vec::new();
1007 loop {
1008 match stream.read(&mut buf) {
1009 Ok(0) => break, Ok(n) => {
1011 let chunk = &buf[..n];
1012 let _ = log_file.write_all(chunk);
1014 for &b in chunk {
1016 if b == b'\n' {
1017 let line = String::from_utf8_lossy(&line_buf);
1018 if let Ok(mut fl) = full_log.lock() {
1019 let ts = now_rfc3339();
1020 let _ = writeln!(fl, "{ts} [{label}] {line}");
1021 }
1022 if let Some(ref f) = on_line {
1023 f(&line);
1024 }
1025 line_buf.clear();
1026 } else {
1027 line_buf.push(b);
1028 }
1029 }
1030 }
1031 Err(_) => break,
1032 }
1033 }
1034 if !line_buf.is_empty() {
1036 let line = String::from_utf8_lossy(&line_buf);
1037 if let Ok(mut fl) = full_log.lock() {
1038 let ts = now_rfc3339();
1039 let _ = writeln!(fl, "{ts} [{label}] {line}");
1040 }
1041 if let Some(ref f) = on_line {
1042 f(&line);
1043 }
1044 }
1045}
1046
1047pub fn supervise(opts: SuperviseOpts) -> Result<()> {
1057 use std::sync::{Arc, Mutex};
1058
1059 let job_id = opts.job_id;
1060 let root = opts.root;
1061 let command = opts.command;
1062
1063 if command.is_empty() {
1064 anyhow::bail!("supervisor: no command");
1065 }
1066
1067 let job_dir = JobDir::open(root, job_id)?;
1068
1069 let meta = job_dir.read_meta()?;
1071 let started_at = now_rfc3339();
1075
1076 let full_log_path = if let Some(p) = opts.full_log {
1078 std::path::PathBuf::from(p)
1079 } else {
1080 job_dir.full_log_path()
1081 };
1082
1083 if let Some(parent) = full_log_path.parent() {
1086 std::fs::create_dir_all(parent)
1087 .with_context(|| format!("create dir for full.log: {}", parent.display()))?;
1088 }
1089 let full_log_file = std::fs::File::create(&full_log_path).context("create full.log")?;
1090 let full_log = Arc::new(Mutex::new(full_log_file));
1091
1092 if opts.shell_wrapper.is_empty() {
1108 anyhow::bail!("supervisor: shell wrapper must not be empty");
1109 }
1110 let mut child_cmd = Command::new(&opts.shell_wrapper[0]);
1111 if command.len() == 1 {
1112 child_cmd.args(&opts.shell_wrapper[1..]).arg(&command[0]);
1114 } else {
1115 #[cfg(unix)]
1125 {
1126 child_cmd
1129 .args(&opts.shell_wrapper[1..])
1130 .arg("exec \"$@\"")
1131 .arg("--")
1132 .args(command);
1133 }
1134 #[cfg(not(unix))]
1135 {
1136 let joined = command
1140 .iter()
1141 .map(|a| {
1142 if a.contains(' ') {
1143 format!("\"{}\"", a)
1144 } else {
1145 a.clone()
1146 }
1147 })
1148 .collect::<Vec<_>>()
1149 .join(" ");
1150 child_cmd.args(&opts.shell_wrapper[1..]).arg(joined);
1151 }
1152 }
1153
1154 if opts.inherit_env {
1155 } else {
1157 child_cmd.env_clear();
1158 }
1159
1160 for env_file in &opts.env_files {
1162 let vars = load_env_file(env_file)?;
1163 for (k, v) in vars {
1164 child_cmd.env(&k, &v);
1165 }
1166 }
1167
1168 for env_var in &opts.env_vars {
1170 let (k, v) = parse_env_var(env_var);
1171 child_cmd.env(&k, &v);
1172 }
1173
1174 if let Some(cwd) = opts.cwd {
1176 child_cmd.current_dir(cwd);
1177 }
1178
1179 #[cfg(unix)]
1184 {
1185 use std::os::unix::process::CommandExt;
1186 unsafe {
1188 child_cmd.pre_exec(|| {
1189 libc::setsid();
1190 Ok(())
1191 });
1192 }
1193 }
1194
1195 let child_stdin = open_child_stdin(&job_dir, opts.stdin_file.as_deref())?;
1197 let mut child = child_cmd
1198 .stdin(child_stdin)
1199 .stdout(std::process::Stdio::piped())
1200 .stderr(std::process::Stdio::piped())
1201 .spawn()
1202 .context("supervisor: spawn child")?;
1203
1204 let pid = child.id();
1205 info!(job_id, pid, "child process started");
1206
1207 #[cfg(windows)]
1214 let windows_job_name = {
1215 match assign_to_job_object(job_id, pid) {
1216 Ok(name) => Some(name),
1217 Err(e) => {
1218 let kill_err = child.kill();
1222 let _ = child.wait(); let failed_state = JobState {
1225 job: JobStateJob {
1226 id: job_id.to_string(),
1227 status: JobStatus::Failed,
1228 started_at: Some(started_at.clone()),
1229 },
1230 result: JobStateResult {
1231 exit_code: None,
1232 signal: None,
1233 duration_ms: None,
1234 },
1235 pid: Some(pid),
1236 finished_at: Some(now_rfc3339()),
1237 updated_at: now_rfc3339(),
1238 windows_job_name: None,
1239 };
1240 let _ = job_dir.write_state(&failed_state);
1243
1244 if opts.notify_command.is_some() || opts.notify_file.is_some() {
1248 let finished_at_ts =
1249 failed_state.finished_at.clone().unwrap_or_else(now_rfc3339);
1250 let stdout_log = job_dir.stdout_path().display().to_string();
1251 let stderr_log = job_dir.stderr_path().display().to_string();
1252 let fail_event = crate::schema::CompletionEvent {
1253 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1254 event_type: "job.finished".to_string(),
1255 job_id: job_id.to_string(),
1256 state: JobStatus::Failed.as_str().to_string(),
1257 command: meta.command.clone(),
1258 cwd: meta.cwd.clone(),
1259 started_at: started_at.clone(),
1260 finished_at: finished_at_ts,
1261 duration_ms: None,
1262 exit_code: None,
1263 signal: None,
1264 stdout_log_path: stdout_log,
1265 stderr_log_path: stderr_log,
1266 };
1267 let fail_event_json = serde_json::to_string(&fail_event).unwrap_or_default();
1268 let fail_event_path = job_dir.completion_event_path().display().to_string();
1269 let mut fail_delivery_results: Vec<crate::schema::SinkDeliveryResult> =
1270 Vec::new();
1271 if let Err(we) = job_dir.write_completion_event_atomic(
1272 &crate::schema::CompletionEventRecord {
1273 event: fail_event.clone(),
1274 delivery_results: vec![],
1275 },
1276 ) {
1277 warn!(
1278 job_id,
1279 error = %we,
1280 "failed to write initial completion_event.json for failed job"
1281 );
1282 }
1283 if let Some(ref shell_cmd) = opts.notify_command {
1284 fail_delivery_results.push(dispatch_command_sink(
1285 shell_cmd,
1286 &fail_event_json,
1287 job_id,
1288 &fail_event_path,
1289 &opts.shell_wrapper,
1290 "job.finished",
1291 ));
1292 }
1293 if let Some(ref file_path) = opts.notify_file {
1294 fail_delivery_results.push(dispatch_file_sink(file_path, &fail_event_json));
1295 }
1296 if let Err(we) = job_dir.write_completion_event_atomic(
1297 &crate::schema::CompletionEventRecord {
1298 event: fail_event,
1299 delivery_results: fail_delivery_results,
1300 },
1301 ) {
1302 warn!(
1303 job_id,
1304 error = %we,
1305 "failed to update completion_event.json with delivery results for failed job"
1306 );
1307 }
1308 }
1309
1310 if let Err(ke) = kill_err {
1311 return Err(anyhow::anyhow!(
1312 "supervisor: failed to assign pid {pid} to Job Object \
1313 (Windows MUST requirement): {e}; also failed to kill child: {ke}"
1314 ));
1315 }
1316 return Err(anyhow::anyhow!(
1317 "supervisor: failed to assign pid {pid} to Job Object \
1318 (Windows MUST requirement); child process was killed; \
1319 consider running outside a nested Job Object environment: {e}"
1320 ));
1321 }
1322 }
1323 };
1324 #[cfg(not(windows))]
1325 let windows_job_name: Option<String> = None;
1326
1327 let state = JobState {
1332 job: JobStateJob {
1333 id: job_id.to_string(),
1334 status: JobStatus::Running,
1335 started_at: Some(started_at.clone()),
1336 },
1337 result: JobStateResult {
1338 exit_code: None,
1339 signal: None,
1340 duration_ms: None,
1341 },
1342 pid: Some(pid),
1343 finished_at: None,
1344 updated_at: now_rfc3339(),
1345 windows_job_name,
1346 };
1347 job_dir.write_state(&state)?;
1348
1349 let child_start_time = std::time::Instant::now();
1350
1351 let child_stdout = child.stdout.take().expect("child stdout piped");
1353 let child_stderr = child.stderr.take().expect("child stderr piped");
1354
1355 let match_checker = std::sync::Arc::new(OutputMatchChecker::new(
1357 job_dir.path.clone(),
1358 opts.shell_wrapper.clone(),
1359 meta.notification.clone(),
1360 ));
1361
1362 let (tx_stdout_done, rx_stdout_done) = std::sync::mpsc::channel::<()>();
1366 let (tx_stderr_done, rx_stderr_done) = std::sync::mpsc::channel::<()>();
1367
1368 let stdout_log_path = job_dir.stdout_path();
1370 let full_log_stdout = Arc::clone(&full_log);
1371 let match_checker_stdout = std::sync::Arc::clone(&match_checker);
1372 let t_stdout = std::thread::spawn(move || {
1373 stream_to_logs(
1374 child_stdout,
1375 &stdout_log_path,
1376 full_log_stdout,
1377 "STDOUT",
1378 Some(move |line: &str| match_checker_stdout.check_line(line, "stdout")),
1379 );
1380 let _ = tx_stdout_done.send(());
1381 });
1382
1383 let stderr_log_path = job_dir.stderr_path();
1385 let full_log_stderr = Arc::clone(&full_log);
1386 let match_checker_stderr = std::sync::Arc::clone(&match_checker);
1387 let t_stderr = std::thread::spawn(move || {
1388 stream_to_logs(
1389 child_stderr,
1390 &stderr_log_path,
1391 full_log_stderr,
1392 "STDERR",
1393 Some(move |line: &str| match_checker_stderr.check_line(line, "stderr")),
1394 );
1395 let _ = tx_stderr_done.send(());
1396 });
1397
1398 let timeout_ms = opts.timeout_ms;
1401 let kill_after_ms = opts.kill_after_ms;
1402 let progress_every_ms = opts.progress_every_ms;
1403 let state_path = job_dir.state_path();
1404 let job_id_str = job_id.to_string();
1405
1406 use std::sync::atomic::{AtomicBool, Ordering};
1408 let child_done = Arc::new(AtomicBool::new(false));
1409
1410 let watcher = if timeout_ms > 0 || progress_every_ms > 0 {
1411 let state_path_clone = state_path.clone();
1412 let child_done_clone = Arc::clone(&child_done);
1413 Some(std::thread::spawn(move || {
1414 let start = std::time::Instant::now();
1415 let timeout_dur = if timeout_ms > 0 {
1416 Some(std::time::Duration::from_millis(timeout_ms))
1417 } else {
1418 None
1419 };
1420 let progress_dur = if progress_every_ms > 0 {
1421 Some(std::time::Duration::from_millis(progress_every_ms))
1422 } else {
1423 None
1424 };
1425
1426 let poll_interval = std::time::Duration::from_millis(100);
1427
1428 loop {
1429 std::thread::sleep(poll_interval);
1430
1431 if child_done_clone.load(Ordering::Relaxed) {
1433 break;
1434 }
1435
1436 let elapsed = start.elapsed();
1437
1438 if let Some(td) = timeout_dur
1440 && elapsed >= td
1441 {
1442 info!(job_id = %job_id_str, "timeout reached, sending SIGTERM to process group");
1443 #[cfg(unix)]
1446 {
1447 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGTERM) };
1448 }
1449 if kill_after_ms > 0 {
1451 std::thread::sleep(std::time::Duration::from_millis(kill_after_ms));
1452 info!(job_id = %job_id_str, "kill-after elapsed, sending SIGKILL to process group");
1453 #[cfg(unix)]
1454 {
1455 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1456 }
1457 } else {
1458 #[cfg(unix)]
1460 {
1461 unsafe { libc::kill(-(pid as libc::pid_t), libc::SIGKILL) };
1462 }
1463 }
1464 break;
1465 }
1466
1467 if let Some(pd) = progress_dur {
1469 let elapsed_ms = elapsed.as_millis() as u64;
1470 let pd_ms = pd.as_millis() as u64;
1471 let poll_ms = poll_interval.as_millis() as u64;
1472 if elapsed_ms % pd_ms < poll_ms {
1473 if let Ok(raw) = std::fs::read(&state_path_clone)
1475 && let Ok(mut st) =
1476 serde_json::from_slice::<crate::schema::JobState>(&raw)
1477 {
1478 st.updated_at = now_rfc3339();
1479 if let Ok(s) = serde_json::to_string_pretty(&st) {
1480 let _ = std::fs::write(&state_path_clone, s);
1481 }
1482 }
1483 }
1484 }
1485 }
1486 }))
1487 } else {
1488 None
1489 };
1490
1491 let exit_status = child.wait().context("wait for child")?;
1493
1494 child_done.store(true, Ordering::Relaxed);
1496
1497 let duration_ms = child_start_time.elapsed().as_millis() as u64;
1504 let exit_code = exit_status.code();
1505 let finished_at = now_rfc3339();
1506
1507 #[cfg(unix)]
1509 let (terminal_status, signal_name) = {
1510 use std::os::unix::process::ExitStatusExt;
1511 if let Some(sig) = exit_status.signal() {
1512 (JobStatus::Killed, Some(sig.to_string()))
1513 } else {
1514 (JobStatus::Exited, None)
1515 }
1516 };
1517 #[cfg(not(unix))]
1518 let (terminal_status, signal_name) = (JobStatus::Exited, None::<String>);
1519
1520 let state = JobState {
1521 job: JobStateJob {
1522 id: job_id.to_string(),
1523 status: terminal_status.clone(),
1524 started_at: Some(started_at.clone()),
1525 },
1526 result: JobStateResult {
1527 exit_code,
1528 signal: signal_name.clone(),
1529 duration_ms: Some(duration_ms),
1530 },
1531 pid: Some(pid),
1532 finished_at: Some(finished_at.clone()),
1533 updated_at: now_rfc3339(),
1534 windows_job_name: None, };
1536 job_dir.write_state(&state)?;
1537 info!(job_id, ?exit_code, "child process finished");
1538
1539 const LOG_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
1550 let drain_deadline = std::time::Instant::now() + LOG_DRAIN_TIMEOUT;
1551
1552 let remaining = drain_deadline
1553 .checked_duration_since(std::time::Instant::now())
1554 .unwrap_or(std::time::Duration::ZERO);
1555 if rx_stdout_done.recv_timeout(remaining).is_ok() {
1556 let _ = t_stdout.join();
1557 } else {
1558 drop(t_stdout); }
1560
1561 let remaining = drain_deadline
1562 .checked_duration_since(std::time::Instant::now())
1563 .unwrap_or(std::time::Duration::ZERO);
1564 if rx_stderr_done.recv_timeout(remaining).is_ok() {
1565 let _ = t_stderr.join();
1566 } else {
1567 drop(t_stderr); }
1569
1570 if let Some(w) = watcher {
1572 let _ = w.join();
1573 }
1574
1575 let latest_notification = job_dir.read_meta().ok().and_then(|m| m.notification);
1578 let (current_notify_command, current_notify_file) = match &latest_notification {
1579 Some(n) => (n.notify_command.clone(), n.notify_file.clone()),
1580 None => (None, None),
1581 };
1582
1583 let has_notification = current_notify_command.is_some() || current_notify_file.is_some();
1586 if has_notification {
1587 let stdout_log = job_dir.stdout_path().display().to_string();
1588 let stderr_log = job_dir.stderr_path().display().to_string();
1589 let event = crate::schema::CompletionEvent {
1590 schema_version: crate::schema::SCHEMA_VERSION.to_string(),
1591 event_type: "job.finished".to_string(),
1592 job_id: job_id.to_string(),
1593 state: terminal_status.as_str().to_string(),
1594 command: meta.command.clone(),
1595 cwd: meta.cwd.clone(),
1596 started_at,
1597 finished_at,
1598 duration_ms: Some(duration_ms),
1599 exit_code,
1600 signal: signal_name,
1601 stdout_log_path: stdout_log,
1602 stderr_log_path: stderr_log,
1603 };
1604
1605 let event_json = serde_json::to_string(&event).unwrap_or_default();
1606 let event_path = job_dir.completion_event_path().display().to_string();
1607 let mut delivery_results: Vec<crate::schema::SinkDeliveryResult> = Vec::new();
1608
1609 if let Err(e) =
1611 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1612 event: event.clone(),
1613 delivery_results: vec![],
1614 })
1615 {
1616 warn!(job_id, error = %e, "failed to write initial completion_event.json");
1617 }
1618
1619 if let Some(ref shell_cmd) = current_notify_command {
1620 delivery_results.push(dispatch_command_sink(
1621 shell_cmd,
1622 &event_json,
1623 job_id,
1624 &event_path,
1625 &opts.shell_wrapper,
1626 "job.finished",
1627 ));
1628 }
1629 if let Some(ref file_path) = current_notify_file {
1630 delivery_results.push(dispatch_file_sink(file_path, &event_json));
1631 }
1632
1633 if let Err(e) =
1635 job_dir.write_completion_event_atomic(&crate::schema::CompletionEventRecord {
1636 event,
1637 delivery_results,
1638 })
1639 {
1640 warn!(job_id, error = %e, "failed to update completion_event.json with delivery results");
1641 }
1642 }
1643
1644 Ok(())
1645}
1646
1647fn dispatch_command_sink(
1654 shell_cmd: &str,
1655 event_json: &str,
1656 job_id: &str,
1657 event_path: &str,
1658 shell_wrapper: &[String],
1659 event_type: &str,
1660) -> crate::schema::SinkDeliveryResult {
1661 use std::io::Write;
1662 let attempted_at = now_rfc3339();
1663 let target = shell_cmd.to_string();
1664
1665 if shell_cmd.trim().is_empty() {
1666 return crate::schema::SinkDeliveryResult {
1667 sink_type: "command".to_string(),
1668 target,
1669 success: false,
1670 error: Some("empty shell command".to_string()),
1671 attempted_at,
1672 };
1673 }
1674
1675 if shell_wrapper.is_empty() {
1676 return crate::schema::SinkDeliveryResult {
1677 sink_type: "command".to_string(),
1678 target,
1679 success: false,
1680 error: Some("shell wrapper must not be empty".to_string()),
1681 attempted_at,
1682 };
1683 }
1684
1685 let mut cmd = Command::new(&shell_wrapper[0]);
1686 cmd.args(&shell_wrapper[1..]).arg(shell_cmd);
1687
1688 cmd.env("AGENT_EXEC_EVENT_PATH", event_path);
1689 cmd.env("AGENT_EXEC_JOB_ID", job_id);
1690 cmd.env("AGENT_EXEC_EVENT_TYPE", event_type);
1691 cmd.stdin(std::process::Stdio::piped());
1692 cmd.stdout(std::process::Stdio::null());
1693 cmd.stderr(std::process::Stdio::null());
1694
1695 match cmd.spawn() {
1696 Ok(mut child) => {
1697 if let Some(mut stdin) = child.stdin.take() {
1698 let _ = stdin.write_all(event_json.as_bytes());
1699 }
1700 match child.wait() {
1701 Ok(status) if status.success() => crate::schema::SinkDeliveryResult {
1702 sink_type: "command".to_string(),
1703 target,
1704 success: true,
1705 error: None,
1706 attempted_at,
1707 },
1708 Ok(status) => crate::schema::SinkDeliveryResult {
1709 sink_type: "command".to_string(),
1710 target,
1711 success: false,
1712 error: Some(format!("exited with status {status}")),
1713 attempted_at,
1714 },
1715 Err(e) => crate::schema::SinkDeliveryResult {
1716 sink_type: "command".to_string(),
1717 target,
1718 success: false,
1719 error: Some(format!("wait error: {e}")),
1720 attempted_at,
1721 },
1722 }
1723 }
1724 Err(e) => crate::schema::SinkDeliveryResult {
1725 sink_type: "command".to_string(),
1726 target,
1727 success: false,
1728 error: Some(format!("spawn error: {e}")),
1729 attempted_at,
1730 },
1731 }
1732}
1733
1734fn dispatch_file_sink(file_path: &str, event_json: &str) -> crate::schema::SinkDeliveryResult {
1737 use std::io::Write;
1738 let attempted_at = now_rfc3339();
1739 let path = std::path::Path::new(file_path);
1740
1741 if let Some(parent) = path.parent()
1742 && let Err(e) = std::fs::create_dir_all(parent)
1743 {
1744 return crate::schema::SinkDeliveryResult {
1745 sink_type: "file".to_string(),
1746 target: file_path.to_string(),
1747 success: false,
1748 error: Some(format!("create parent dir: {e}")),
1749 attempted_at,
1750 };
1751 }
1752
1753 match std::fs::OpenOptions::new()
1754 .create(true)
1755 .append(true)
1756 .open(path)
1757 {
1758 Ok(mut f) => match writeln!(f, "{event_json}") {
1759 Ok(_) => crate::schema::SinkDeliveryResult {
1760 sink_type: "file".to_string(),
1761 target: file_path.to_string(),
1762 success: true,
1763 error: None,
1764 attempted_at,
1765 },
1766 Err(e) => crate::schema::SinkDeliveryResult {
1767 sink_type: "file".to_string(),
1768 target: file_path.to_string(),
1769 success: false,
1770 error: Some(format!("write error: {e}")),
1771 attempted_at,
1772 },
1773 },
1774 Err(e) => crate::schema::SinkDeliveryResult {
1775 sink_type: "file".to_string(),
1776 target: file_path.to_string(),
1777 success: false,
1778 error: Some(format!("open error: {e}")),
1779 attempted_at,
1780 },
1781 }
1782}
1783
1784pub fn now_rfc3339_pub() -> String {
1786 now_rfc3339()
1787}
1788
1789fn now_rfc3339() -> String {
1790 let d = std::time::SystemTime::now()
1792 .duration_since(std::time::UNIX_EPOCH)
1793 .unwrap_or_default();
1794 format_rfc3339(d.as_secs())
1795}
1796
1797fn format_rfc3339(secs: u64) -> String {
1798 let mut s = secs;
1800 let seconds = s % 60;
1801 s /= 60;
1802 let minutes = s % 60;
1803 s /= 60;
1804 let hours = s % 24;
1805 s /= 24;
1806
1807 let mut days = s;
1809 let mut year = 1970u64;
1810 loop {
1811 let days_in_year = if is_leap(year) { 366 } else { 365 };
1812 if days < days_in_year {
1813 break;
1814 }
1815 days -= days_in_year;
1816 year += 1;
1817 }
1818
1819 let leap = is_leap(year);
1820 let month_days: [u64; 12] = [
1821 31,
1822 if leap { 29 } else { 28 },
1823 31,
1824 30,
1825 31,
1826 30,
1827 31,
1828 31,
1829 30,
1830 31,
1831 30,
1832 31,
1833 ];
1834 let mut month = 0usize;
1835 for (i, &d) in month_days.iter().enumerate() {
1836 if days < d {
1837 month = i;
1838 break;
1839 }
1840 days -= d;
1841 }
1842 let day = days + 1;
1843
1844 format!(
1845 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
1846 year,
1847 month + 1,
1848 day,
1849 hours,
1850 minutes,
1851 seconds
1852 )
1853}
1854
1855fn is_leap(year: u64) -> bool {
1856 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
1857}
1858
1859#[cfg(windows)]
1870fn assign_to_job_object(job_id: &str, pid: u32) -> Result<String> {
1871 use windows::Win32::Foundation::CloseHandle;
1872 use windows::Win32::System::JobObjects::{AssignProcessToJobObject, CreateJobObjectW};
1873 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
1874 use windows::core::HSTRING;
1875
1876 let job_name = format!("AgentExec-{job_id}");
1877 let hname = HSTRING::from(job_name.as_str());
1878
1879 unsafe {
1880 let proc_handle =
1882 OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid).map_err(|e| {
1883 anyhow::anyhow!(
1884 "supervisor: OpenProcess(pid={pid}) failed — cannot assign to Job Object: {e}"
1885 )
1886 })?;
1887
1888 let job = match CreateJobObjectW(None, &hname) {
1890 Ok(h) => h,
1891 Err(e) => {
1892 let _ = CloseHandle(proc_handle);
1893 return Err(anyhow::anyhow!(
1894 "supervisor: CreateJobObjectW({job_name}) failed: {e}"
1895 ));
1896 }
1897 };
1898
1899 if let Err(e) = AssignProcessToJobObject(job, proc_handle) {
1903 let _ = CloseHandle(job);
1904 let _ = CloseHandle(proc_handle);
1905 return Err(anyhow::anyhow!(
1906 "supervisor: AssignProcessToJobObject(pid={pid}) failed \
1907 (process may already belong to another Job Object, e.g. in a CI environment): {e}"
1908 ));
1909 }
1910
1911 let _ = CloseHandle(proc_handle);
1916 std::mem::forget(job);
1919 }
1920
1921 info!(job_id, name = %job_name, "supervisor: child assigned to Job Object");
1922 Ok(job_name)
1923}
1924
1925#[cfg(test)]
1926mod tests {
1927 use super::*;
1928
1929 #[test]
1930 fn rfc3339_epoch() {
1931 assert_eq!(format_rfc3339(0), "1970-01-01T00:00:00Z");
1932 }
1933
1934 #[test]
1935 fn rfc3339_known_date() {
1936 assert_eq!(format_rfc3339(1704067200), "2024-01-01T00:00:00Z");
1938 }
1939}