1use std::{
58 io::Write as _,
59 process::Stdio,
60 sync::{
61 Arc,
62 atomic::{AtomicU32, Ordering},
63 },
64 time::{Duration as StdDuration, Instant, SystemTime, UNIX_EPOCH},
65};
66
67use taskvisor::{TaskError, TaskFn, TaskRef};
68use tempfile::NamedTempFile;
69use tokio::process::Command;
70use tokio_util::sync::CancellationToken;
71use tracing::{debug, trace, warn};
72
73use solti_model::{SubprocessSpec, TaskId, TaskKind, TaskSpec, merge_env};
74use solti_runner::{
75 BuildContext, OutputRegistry, Runner, RunnerError, RunnerErrorKind, RunnerType,
76};
77
78use crate::metrics::classify_task_error;
79use crate::subprocess::{
80 backend::SubprocessBackendConfig,
81 logger::{LogConfig, StreamKind, log_stream},
82 task::SubprocessTaskConfig,
83};
84
85pub struct SubprocessRunner {
94 name: &'static str,
96 config: Option<Arc<SubprocessBackendConfig>>,
98}
99
100impl SubprocessRunner {
101 pub fn new(name: &'static str) -> Self {
103 Self { name, config: None }
104 }
105
106 pub fn with_config(
112 name: &'static str,
113 config: SubprocessBackendConfig,
114 ) -> Result<Self, crate::ExecError> {
115 config.validate()?;
116 Ok(Self {
117 name,
118 config: Some(Arc::new(config)),
119 })
120 }
121
122 fn build_task_config(
126 &self,
127 spec: &TaskSpec,
128 ctx: &BuildContext,
129 ) -> Result<(SubprocessTaskConfig, Option<NamedTempFile>), RunnerError> {
130 let slot = spec.slot();
131 let (cfg, script_tempfile) = match spec.kind() {
132 TaskKind::Subprocess(SubprocessSpec {
133 mode,
134 env,
135 cwd,
136 fail_on_non_zero,
137 }) => {
138 let Resolved {
139 command,
140 args,
141 script_tempfile,
142 } = Self::resolve_mode(mode)?;
143 let run_id = self.build_run_id(slot.as_str());
144 let cfg = SubprocessTaskConfig {
145 seq: run_id.seq(),
146 run_id: Arc::from(run_id.into_name()),
147 fail_on_non_zero: *fail_on_non_zero,
148 env: merge_env(env, ctx.env()),
149 cwd: cwd.clone(),
150 command,
151 args,
152 };
153 (cfg, script_tempfile)
154 }
155 other => {
156 return Err(RunnerError::UnsupportedKind {
157 runner: self.name,
158 kind: other.kind().to_string(),
159 });
160 }
161 };
162 cfg.validate()
163 .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
164 Ok((cfg, script_tempfile))
165 }
166
167 fn resolve_mode(mode: &solti_model::SubprocessMode) -> Result<Resolved, RunnerError> {
178 match mode {
179 solti_model::SubprocessMode::Command { command, args } => Ok(Resolved {
180 command: command.clone(),
181 args: args.clone(),
182 script_tempfile: None,
183 }),
184 solti_model::SubprocessMode::Script { runtime, args, .. } => {
185 let script = mode
186 .decode_body()
187 .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
188
189 let mut tmp = NamedTempFile::with_prefix("solti-script-").map_err(|e| {
190 RunnerError::InvalidSpec(format!("failed to create script tempfile: {e}"))
191 })?;
192
193 #[cfg(unix)]
194 {
195 use std::os::unix::fs::PermissionsExt;
196 let perms = std::fs::Permissions::from_mode(0o600);
197 if let Err(e) = tmp.as_file().set_permissions(perms) {
198 return Err(RunnerError::InvalidSpec(format!(
199 "failed to chmod 0600 script tempfile: {e}"
200 )));
201 }
202 }
203
204 tmp.write_all(script.as_bytes()).map_err(|e| {
205 RunnerError::InvalidSpec(format!("failed to write script body: {e}"))
206 })?;
207 tmp.as_file()
208 .sync_all()
209 .or_else(|_| tmp.as_file().flush())
210 .map_err(|e| {
211 RunnerError::InvalidSpec(format!("failed to flush script tempfile: {e}"))
212 })?;
213
214 let (cmd, _flag_deprecated_for_tempfile_transport) = runtime.resolve();
215 let path = tmp.path().to_string_lossy().into_owned();
216
217 let mut full_args = Vec::with_capacity(1 + args.len());
218 full_args.push(path);
219 full_args.extend(args.iter().cloned());
220
221 Ok(Resolved {
222 command: cmd.to_string(),
223 args: full_args,
224 script_tempfile: Some(tmp),
225 })
226 }
227 }
228 }
229}
230
231#[derive(Debug)]
233struct Resolved {
234 command: String,
235 args: Vec<String>,
236
237 script_tempfile: Option<NamedTempFile>,
239}
240
241impl Runner for SubprocessRunner {
242 fn name(&self) -> &'static str {
243 self.name
244 }
245
246 fn supports(&self, spec: &TaskSpec) -> bool {
247 matches!(spec.kind(), TaskKind::Subprocess(_))
248 }
249
250 fn build_task(&self, spec: &TaskSpec, ctx: &BuildContext) -> Result<TaskRef, RunnerError> {
251 let (task_cfg, script_tempfile) = self.build_task_config(spec, ctx)?;
252
253 trace!(
254 slot = %spec.slot(),
255 task = %task_cfg.run_id,
256 "building subprocess task",
257 );
258
259 let cgroup_name = self.config.as_ref().and_then(|cfg| {
260 cfg.has_cgroups().then(|| {
261 let timestamp = SystemTime::now()
262 .duration_since(UNIX_EPOCH)
263 .unwrap_or(StdDuration::from_secs(0))
264 .as_secs();
265 crate::utils::build_cgroup_name(
266 self.name,
267 spec.slot().as_str(),
268 task_cfg.seq,
269 timestamp,
270 )
271 })
272 });
273
274 let log_cfg = self
275 .config
276 .as_ref()
277 .map(|c| *c.log_config())
278 .unwrap_or_default();
279
280 let task_id = TaskId::from(Arc::clone(&task_cfg.run_id));
281 ctx.output_registry().ensure_channel(task_id);
282
283 let exec_ctx = Arc::new(TaskExecContext {
284 task_cfg,
285 runner_cfg: self.config.clone(),
286 cgroup_name,
287 metrics: ctx.metrics().clone(),
288 log_cfg,
289 output_registry: Arc::clone(ctx.output_registry()),
290 attempt: AtomicU32::new(0),
291
292 _script_tempfile: script_tempfile.map(Arc::new),
293 });
294
295 let run_id = exec_ctx.task_cfg.run_id.to_string();
296 let task: TaskRef = TaskFn::arc(run_id, move |cancel: CancellationToken| {
297 let ctx = Arc::clone(&exec_ctx);
298 async move { run_subprocess(ctx, cancel).await }
299 });
300 Ok(task)
301 }
302}
303
304struct TaskExecContext {
306 runner_cfg: Option<Arc<SubprocessBackendConfig>>,
307 metrics: solti_runner::MetricsHandle,
308 output_registry: Arc<OutputRegistry>,
309 task_cfg: SubprocessTaskConfig,
310 cgroup_name: Option<String>,
311 log_cfg: LogConfig,
312 attempt: AtomicU32,
313
314 _script_tempfile: Option<Arc<NamedTempFile>>,
315}
316
317fn build_command(ctx: &TaskExecContext) -> Command {
319 let mut cmd = Command::new(&ctx.task_cfg.command);
320 cmd.args(&ctx.task_cfg.args);
321 if let Some(cwd) = &ctx.task_cfg.cwd {
322 cmd.current_dir(cwd);
323 }
324 cmd.envs(&ctx.task_cfg.env);
325 cmd.stdout(Stdio::piped());
326 cmd.stderr(Stdio::piped());
327
328 #[cfg(unix)]
337 cmd.process_group(0);
338
339 cmd.kill_on_drop(true);
344
345 cmd
346}
347
348async fn kill_process_group(child: &mut tokio::process::Child, run_id: &str) {
354 #[cfg(unix)]
355 {
356 if let Some(pid) = child.id() {
357 let rc = unsafe { libc::kill(-(pid as i32), libc::SIGKILL) };
358 if rc != 0 {
359 let err = std::io::Error::last_os_error();
360 if err.raw_os_error() != Some(libc::ESRCH) {
361 warn!(
362 task = %run_id,
363 error = %err,
364 "killpg failed; falling back to single-pid kill",
365 );
366 let _ = child.kill().await;
367 }
368 }
369 } else {
370 }
372 }
373 #[cfg(not(unix))]
374 {
375 let _ = child.kill().await;
376 }
377}
378
379fn prepare_backend(ctx: &TaskExecContext) -> Result<(), TaskError> {
381 if let Some(backend_cfg) = &ctx.runner_cfg {
382 let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
383
384 if let Err(e) = backend_cfg.prepare_cgroups(cgroup_name_ref) {
385 ctx.metrics
386 .record_runner_error(RunnerType::Subprocess, RunnerErrorKind::CgroupPrepareFailed);
387 return Err(TaskError::Fatal {
388 reason: format!("failed to prepare cgroup: {e}"),
389 exit_code: None,
390 });
391 }
392 }
393 Ok(())
394}
395
396fn apply_backend(cmd: &mut Command, ctx: &TaskExecContext) -> Result<(), TaskError> {
398 if let Some(backend_cfg) = &ctx.runner_cfg {
399 let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
400
401 if let Err(e) = backend_cfg.apply_to_command(cmd, cgroup_name_ref) {
402 ctx.metrics
403 .record_runner_error(RunnerType::Subprocess, RunnerErrorKind::BackendConfigFailed);
404 return Err(TaskError::Fatal {
405 reason: format!("failed to apply runner config: {e}"),
406 exit_code: None,
407 });
408 }
409 }
410 Ok(())
411}
412
413fn evaluate_exit(
415 status: std::process::ExitStatus,
416 task_cfg: &SubprocessTaskConfig,
417) -> Result<(), TaskError> {
418 if !status.success() && task_cfg.fail_on_non_zero.is_enabled() {
419 let exit_code = status.code();
420 let reason = match exit_code {
421 Some(code) => format!("process exited with non-zero code: {code}"),
422 None => "process terminated by signal".into(),
423 };
424 Err(TaskError::Fail { reason, exit_code })
425 } else {
426 debug!(task = %task_cfg.run_id, "subprocess exited successfully");
427 Ok(())
428 }
429}
430
431struct CgroupGuard<'a>(Option<&'a str>);
433
434impl Drop for CgroupGuard<'_> {
435 fn drop(&mut self) {
436 if let Some(name) = self.0 {
437 crate::utils::cleanup_cgroup(name);
438 }
439 }
440}
441
442async fn run_subprocess(
444 ctx: Arc<TaskExecContext>,
445 cancel: CancellationToken,
446) -> Result<(), TaskError> {
447 ctx.metrics.record_task_started(RunnerType::Subprocess);
448 let start = Instant::now();
449
450 trace!(
451 task = %ctx.task_cfg.run_id,
452 command = %ctx.task_cfg.command,
453 args = ?ctx.task_cfg.args,
454 cwd = ?ctx.task_cfg.cwd,
455 "spawning subprocess",
456 );
457
458 prepare_backend(&ctx)?;
459
460 let _cgroup_guard = CgroupGuard(ctx.cgroup_name.as_deref());
461 let mut cmd = build_command(&ctx);
462 apply_backend(&mut cmd, &ctx)?;
463
464 let mut child = match cmd.spawn() {
465 Ok(child) => child,
466 Err(e) => {
467 ctx.metrics
468 .record_runner_error(RunnerType::Subprocess, RunnerErrorKind::SpawnFailed);
469 return Err(TaskError::Fatal {
470 reason: format!("spawn failed: {e}"),
471 exit_code: None,
472 });
473 }
474 };
475
476 let log_cfg = ctx.log_cfg;
477
478 let attempt = ctx.attempt.fetch_add(1, Ordering::Relaxed) + 1;
479 let task_id = TaskId::from(Arc::clone(&ctx.task_cfg.run_id));
480 let sink = ctx.output_registry.sink_for(task_id, attempt);
481
482 let stdout = child.stdout.take().ok_or_else(|| TaskError::Fatal {
483 reason: "failed to capture stdout".into(),
484 exit_code: None,
485 })?;
486 let run_id_stdout = Arc::clone(&ctx.task_cfg.run_id);
487 let sink_stdout = sink.clone();
488 let stdout_task = tokio::spawn(async move {
489 log_stream(
490 stdout,
491 &run_id_stdout,
492 StreamKind::Stdout,
493 &log_cfg,
494 Some(&sink_stdout),
495 )
496 .await;
497 });
498
499 let stderr = child.stderr.take().ok_or_else(|| TaskError::Fatal {
500 reason: "failed to capture stderr".into(),
501 exit_code: None,
502 })?;
503 let run_id_stderr = Arc::clone(&ctx.task_cfg.run_id);
504 let sink_stderr = sink.clone();
505 let stderr_task = tokio::spawn(async move {
506 log_stream(
507 stderr,
508 &run_id_stderr,
509 StreamKind::Stderr,
510 &log_cfg,
511 Some(&sink_stderr),
512 )
513 .await;
514 });
515
516 let result = tokio::select! {
517 biased;
518 res = child.wait() => {
519 let status = res.map_err(|e| TaskError::Fatal {
520 reason: format!("wait failed: {e}"),
521 exit_code: None,
522 })?;
523 evaluate_exit(status, &ctx.task_cfg)
524 }
525 _ = cancel.cancelled() => {
526 debug!(
527 task = %ctx.task_cfg.run_id,
528 "cancellation requested; killing subprocess group",
529 );
530 kill_process_group(&mut child, &ctx.task_cfg.run_id).await;
531 let _ = child.wait().await;
532 Err(TaskError::Canceled)
533 }
534 };
535
536 let duration_ms = start.elapsed().as_millis() as u64;
537 let outcome = match &result {
538 Ok(()) => solti_runner::TaskOutcome::Success,
539 Err(e) => classify_task_error(e),
540 };
541 ctx.metrics
542 .record_task_completed(RunnerType::Subprocess, outcome, duration_ms);
543
544 let _ = tokio::join!(stdout_task, stderr_task);
545 result
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 fn mk_backoff() -> solti_model::BackoffPolicy {
553 solti_model::BackoffPolicy {
554 jitter: solti_model::JitterPolicy::Equal,
555 first_ms: 100,
556 max_ms: 1000,
557 factor: 2.0,
558 }
559 }
560
561 fn mk_subprocess_spec(slot: &str, command: &str) -> TaskSpec {
562 mk_subprocess_spec_with_args(slot, command, &[])
563 }
564
565 fn mk_subprocess_spec_with_args(slot: &str, command: &str, args: &[&str]) -> TaskSpec {
566 TaskSpec::builder(
567 slot,
568 TaskKind::Subprocess(SubprocessSpec {
569 mode: solti_model::SubprocessMode::Command {
570 command: command.into(),
571 args: args.iter().map(|s| s.to_string()).collect(),
572 },
573 env: Default::default(),
574 cwd: None,
575 fail_on_non_zero: Default::default(),
576 }),
577 5_000u64,
578 )
579 .restart(solti_model::RestartPolicy::Never)
580 .backoff(mk_backoff())
581 .admission(solti_model::AdmissionPolicy::DropIfRunning)
582 .build()
583 .unwrap()
584 }
585
586 fn mk_embedded_spec(slot: &str) -> TaskSpec {
587 TaskSpec::builder(slot, TaskKind::Embedded, 5_000u64)
588 .restart(solti_model::RestartPolicy::Never)
589 .backoff(mk_backoff())
590 .admission(solti_model::AdmissionPolicy::DropIfRunning)
591 .build()
592 .unwrap()
593 }
594
595 fn make_task_cfg() -> SubprocessTaskConfig {
596 SubprocessTaskConfig {
597 run_id: Arc::from("test-run-1"),
598 seq: 1,
599 command: "echo".into(),
600 args: vec!["hello".into()],
601 env: Default::default(),
602 cwd: None,
603 fail_on_non_zero: solti_model::Flag::default(),
604 }
605 }
606
607 fn make_exec_ctx() -> TaskExecContext {
608 TaskExecContext {
609 task_cfg: make_task_cfg(),
610 runner_cfg: None,
611 cgroup_name: None,
612 metrics: solti_runner::noop_metrics(),
613 log_cfg: LogConfig::default(),
614 output_registry: Arc::new(OutputRegistry::default()),
615 attempt: AtomicU32::new(0),
616 _script_tempfile: None,
617 }
618 }
619
620 #[test]
621 fn build_command_sets_args_and_pipes() {
622 let ctx = make_exec_ctx();
623 let cmd = build_command(&ctx);
624 let std_cmd = cmd.as_std();
625 assert_eq!(std_cmd.get_program(), "echo");
626 let args: Vec<_> = std_cmd.get_args().collect();
627 assert_eq!(args, vec!["hello"]);
628 }
629
630 #[test]
631 fn build_command_sets_env() {
632 let mut ctx = make_exec_ctx();
633 ctx.task_cfg.env.insert("FOO".into(), "bar".into());
634 let cmd = build_command(&ctx);
635 let envs: Vec<_> = cmd.as_std().get_envs().collect();
636 assert!(
637 envs.iter()
638 .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar")))
639 );
640 }
641
642 #[test]
643 fn evaluate_exit_success() {
644 use std::process::Command as StdCommand;
645 let status = StdCommand::new("true").status().unwrap();
646 let cfg = make_task_cfg();
647 assert!(evaluate_exit(status, &cfg).is_ok());
648 }
649
650 #[test]
651 fn evaluate_exit_non_zero_with_fail_flag() {
652 use std::process::Command as StdCommand;
653 let status = StdCommand::new("false").status().unwrap();
654 let mut cfg = make_task_cfg();
655 cfg.fail_on_non_zero = solti_model::Flag::enabled();
656 let result = evaluate_exit(status, &cfg);
657 assert!(result.is_err());
658 match result.unwrap_err() {
659 TaskError::Fail { reason, exit_code } => {
660 assert!(reason.contains("non-zero"));
661 assert_eq!(exit_code, Some(1));
662 }
663 other => panic!("expected TaskError::Fail, got {other:?}"),
664 }
665 }
666
667 #[test]
668 fn evaluate_exit_non_zero_without_fail_flag() {
669 use std::process::Command as StdCommand;
670 let status = StdCommand::new("false").status().unwrap();
671 let mut cfg = make_task_cfg();
672 cfg.fail_on_non_zero = solti_model::Flag::disabled();
673 assert!(evaluate_exit(status, &cfg).is_ok());
674 }
675
676 #[test]
677 fn build_task_returns_task_ref_for_subprocess() {
678 let runner = SubprocessRunner::new("test-runner");
679 let spec = mk_subprocess_spec("test-slot", "echo");
680 let result = runner.build_task(&spec, &BuildContext::default());
681 assert!(result.is_ok());
682 }
683
684 #[test]
685 fn build_task_rejects_non_subprocess_kind() {
686 let runner = SubprocessRunner::new("test-runner");
687 let spec = mk_embedded_spec("test-slot");
688 match runner.build_task(&spec, &BuildContext::default()) {
689 Err(RunnerError::UnsupportedKind { runner, kind }) => {
690 assert_eq!(runner, "test-runner");
691 assert_eq!(kind, "embedded");
692 }
693 Err(other) => panic!("expected UnsupportedKind, got {other:?}"),
694 Ok(_) => panic!("expected error, got Ok"),
695 }
696 }
697
698 #[test]
699 fn supports_returns_true_for_subprocess() {
700 let runner = SubprocessRunner::new("test");
701 assert!(runner.supports(&mk_subprocess_spec("s", "echo")));
702 }
703
704 #[test]
705 fn supports_returns_false_for_embedded() {
706 let runner = SubprocessRunner::new("test");
707 assert!(!runner.supports(&mk_embedded_spec("s")));
708 }
709
710 #[test]
711 fn build_task_returns_task_ref_for_script_mode() {
712 use base64::Engine;
713 use base64::engine::general_purpose::STANDARD as BASE64;
714
715 let runner = SubprocessRunner::new("test-runner");
716 let spec = TaskSpec::builder(
717 "test-slot",
718 TaskKind::Subprocess(solti_model::SubprocessSpec {
719 mode: solti_model::SubprocessMode::Script {
720 runtime: solti_model::Runtime::Bash,
721 body: BASE64.encode(b"echo hello"),
722 args: vec![],
723 },
724 env: Default::default(),
725 cwd: None,
726 fail_on_non_zero: Default::default(),
727 }),
728 5_000u64,
729 )
730 .restart(solti_model::RestartPolicy::Never)
731 .backoff(mk_backoff())
732 .admission(solti_model::AdmissionPolicy::DropIfRunning)
733 .build()
734 .unwrap();
735 let result = runner.build_task(&spec, &BuildContext::default());
736 assert!(result.is_ok());
737 }
738
739 #[test]
740 fn resolve_mode_command() {
741 let mode = solti_model::SubprocessMode::Command {
742 command: "ls".into(),
743 args: vec!["-la".into()],
744 };
745 let r = SubprocessRunner::resolve_mode(&mode).unwrap();
746 assert_eq!(r.command, "ls");
747 assert_eq!(r.args, vec!["-la"]);
748 assert!(
749 r.script_tempfile.is_none(),
750 "Command mode needs no tempfile"
751 );
752 }
753
754 #[test]
755 fn resolve_mode_script_bash_uses_tempfile() {
756 use base64::Engine;
757 use base64::engine::general_purpose::STANDARD as BASE64;
758
759 let mode = solti_model::SubprocessMode::Script {
760 runtime: solti_model::Runtime::Bash,
761 body: BASE64.encode(b"echo hello"),
762 args: vec!["extra".into()],
763 };
764 let r = SubprocessRunner::resolve_mode(&mode).unwrap();
765 assert_eq!(r.command, "bash");
766 assert_eq!(r.args.len(), 2, "args: {:?}", r.args);
767 assert_eq!(r.args[1], "extra");
768
769 let tmp = r
770 .script_tempfile
771 .expect("Script mode must produce a tempfile");
772 assert_eq!(tmp.path().to_string_lossy(), r.args[0]);
773 let written = std::fs::read_to_string(tmp.path()).expect("tempfile readable");
774 assert_eq!(written, "echo hello");
775
776 #[cfg(unix)]
777 {
778 use std::os::unix::fs::PermissionsExt;
779 let perms = std::fs::metadata(tmp.path()).unwrap().permissions();
780 assert_eq!(
781 perms.mode() & 0o777,
782 0o600,
783 "tempfile must be chmod 0600 (may carry secrets)"
784 );
785 }
786 }
787
788 #[test]
789 fn resolve_mode_script_custom_ignores_flag() {
790 use base64::Engine;
791 use base64::engine::general_purpose::STANDARD as BASE64;
792
793 let mode = solti_model::SubprocessMode::Script {
794 runtime: solti_model::Runtime::Custom {
795 command: "ruby".into(),
796 flag: "-e".into(),
797 },
798 body: BASE64.encode(b"puts 'hi'"),
799 args: vec![],
800 };
801 let r = SubprocessRunner::resolve_mode(&mode).unwrap();
802 assert_eq!(r.command, "ruby");
803 assert_eq!(r.args.len(), 1, "only the tempfile path, no flag");
804 assert!(!r.args[0].contains("-e"), "flag must not leak into args");
805 assert!(r.script_tempfile.is_some());
806 }
807
808 #[cfg(unix)]
809 #[tokio::test]
810 async fn cancel_reaps_forked_grandchildren() {
811 use std::process::Stdio;
812 use std::sync::atomic::{AtomicU32, Ordering};
813 use std::time::Duration;
814 use tokio::process::Command as TokioCommand;
815 use tokio::time::timeout;
816
817 static N: AtomicU32 = AtomicU32::new(0);
818 let marker = std::env::temp_dir().join(format!(
819 "solti-exec-pgid-test-{}-{}",
820 std::process::id(),
821 N.fetch_add(1, Ordering::SeqCst)
822 ));
823 let marker_str = marker.to_string_lossy().to_string();
824
825 let script = format!(
826 r#"
827 (sleep 60 & echo $! > {marker}) &
828 wait
829 "#,
830 marker = marker_str
831 );
832
833 let mut cmd = TokioCommand::new("bash");
834 cmd.args(["-c", &script])
835 .stdout(Stdio::null())
836 .stderr(Stdio::null());
837 cmd.process_group(0);
838 cmd.kill_on_drop(true);
839
840 let mut child = cmd.spawn().expect("bash must spawn");
841
842 let grandchild_pid: i32 = {
843 let mut attempts = 0;
844 loop {
845 if let Ok(s) = std::fs::read_to_string(&marker) {
846 if let Some(line) = s.trim().lines().next() {
847 if let Ok(pid) = line.parse::<i32>() {
848 break pid;
849 }
850 }
851 }
852 attempts += 1;
853 if attempts > 50 {
854 panic!("grandchild never reported its pid via marker");
855 }
856 tokio::time::sleep(Duration::from_millis(20)).await;
857 }
858 };
859
860 let alive = unsafe { libc::kill(grandchild_pid, 0) };
861 assert_eq!(alive, 0, "grandchild must be alive before cancel");
862
863 kill_process_group(&mut child, "test").await;
864 let _ = timeout(Duration::from_secs(2), child.wait()).await;
865
866 let mut caught = false;
867 for _ in 0..50 {
868 let rc = unsafe { libc::kill(grandchild_pid, 0) };
869 if rc != 0 && std::io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH) {
870 caught = true;
871 break;
872 }
873 tokio::time::sleep(Duration::from_millis(20)).await;
874 }
875
876 let _ = std::fs::remove_file(&marker);
877
878 if !caught {
879 unsafe { libc::kill(grandchild_pid, libc::SIGKILL) };
880 panic!(
881 "grandchild PID {} survived cancel — process-group kill did not reach it",
882 grandchild_pid
883 );
884 }
885 }
886
887 #[test]
888 fn resolve_mode_invalid_base64() {
889 let mode = solti_model::SubprocessMode::Script {
890 runtime: solti_model::Runtime::Bash,
891 body: "not-valid!!!".into(),
892 args: vec![],
893 };
894 let err = SubprocessRunner::resolve_mode(&mode).unwrap_err();
895 assert!(matches!(err, RunnerError::InvalidSpec(_)));
896 }
897
898 #[tokio::test]
899 async fn subprocess_streams_stdout_into_output_registry() {
900 use solti_model::{OutputEvent, TaskId};
901 use solti_runner::OutputRegistry;
902 use std::sync::Arc;
903 use std::time::Duration;
904 use tokio_util::sync::CancellationToken;
905
906 let registry = Arc::new(OutputRegistry::new(64));
907 let ctx = BuildContext::default().with_output_registry(registry.clone());
908
909 let runner = SubprocessRunner::new("test-runner");
910 let spec = mk_subprocess_spec_with_args("echo-slot", "echo", &["hello-stream"]);
911 let task_ref = runner.build_task(&spec, &ctx).unwrap();
912 let task_id = TaskId::from(task_ref.name());
913
914 let mut rx = registry
915 .subscribe(&task_id)
916 .expect("registry must have channel after build_task");
917
918 let cancel = CancellationToken::new();
919 task_ref.spawn(cancel).await.expect("echo must succeed");
920
921 let mut found_line = None;
922 for _ in 0..100 {
923 if let Ok(OutputEvent::Chunk(c)) = rx.try_recv() {
924 let line_text = std::str::from_utf8(&c.line).unwrap_or_default();
925 if line_text.contains("hello-stream") {
926 found_line = Some(c);
927 break;
928 }
929 } else {
930 tokio::time::sleep(Duration::from_millis(10)).await;
931 }
932 }
933
934 let chunk = found_line.expect("expected to receive 'hello-stream' line");
935 assert_eq!(chunk.attempt, 1);
936 assert_eq!(chunk.stream, solti_model::StreamKind::Stdout);
937 }
938
939 #[tokio::test]
940 async fn subprocess_attempt_counter_increments_on_each_spawn() {
941 use solti_model::{OutputEvent, TaskId};
942 use solti_runner::OutputRegistry;
943 use std::sync::Arc;
944 use std::time::Duration;
945 use tokio_util::sync::CancellationToken;
946
947 let registry = Arc::new(OutputRegistry::new(64));
948 let ctx = BuildContext::default().with_output_registry(registry.clone());
949 let runner = SubprocessRunner::new("test-runner");
950 let spec = mk_subprocess_spec_with_args("attempts-slot", "echo", &["x"]);
951 let task_ref = runner.build_task(&spec, &ctx).unwrap();
952 let task_id = TaskId::from(task_ref.name());
953 let mut rx = registry.subscribe(&task_id).unwrap();
954
955 task_ref.spawn(CancellationToken::new()).await.unwrap();
956 task_ref.spawn(CancellationToken::new()).await.unwrap();
957
958 let mut attempts = std::collections::BTreeSet::new();
959 for _ in 0..200 {
960 match rx.try_recv() {
961 Ok(OutputEvent::Chunk(c)) => {
962 attempts.insert(c.attempt);
963 }
964 Ok(_) => {}
965 Err(_) => tokio::time::sleep(Duration::from_millis(10)).await,
966 }
967 }
968 assert!(attempts.contains(&1), "attempt 1 missing: {attempts:?}");
969 assert!(attempts.contains(&2), "attempt 2 missing: {attempts:?}");
970 }
971
972 #[test]
973 fn resolve_mode_script_accepts_large_body() {
974 use base64::Engine;
975 use base64::engine::general_purpose::STANDARD as BASE64;
976
977 let payload: Vec<u8> = b"# "
978 .iter()
979 .copied()
980 .chain(std::iter::repeat_n(b'x', 200 * 1024))
981 .collect();
982 let mode = solti_model::SubprocessMode::Script {
983 runtime: solti_model::Runtime::Bash,
984 body: BASE64.encode(&payload),
985 args: vec![],
986 };
987 let r = SubprocessRunner::resolve_mode(&mode)
988 .expect("200 KiB script must resolve via tempfile");
989 assert_eq!(r.command, "bash");
990 assert_eq!(r.args.len(), 1);
991 let tmp = r
992 .script_tempfile
993 .expect("large Script must allocate a tempfile");
994 let written = std::fs::read(tmp.path()).unwrap();
995 assert_eq!(written.len(), payload.len());
996 }
997}