1use std::{
58 io::Write as _,
59 process::Stdio,
60 sync::Arc,
61 time::{Duration as StdDuration, Instant, SystemTime, UNIX_EPOCH},
62};
63
64use taskvisor::{TaskError, TaskFn, TaskRef};
65use tempfile::NamedTempFile;
66use tokio::process::Command;
67use tokio_util::sync::CancellationToken;
68use tracing::{debug, trace, warn};
69
70use solti_model::{SubprocessSpec, TaskKind, TaskSpec, merge_env};
71use solti_runner::{BuildContext, Runner, RunnerError, RunnerType};
72
73use crate::metrics::classify_task_error;
74use crate::subprocess::{
75 backend::SubprocessBackendConfig,
76 logger::{LogConfig, StreamKind, log_stream},
77 task::SubprocessTaskConfig,
78};
79
80pub struct SubprocessRunner {
89 name: &'static str,
91 config: Option<Arc<SubprocessBackendConfig>>,
93}
94
95impl SubprocessRunner {
96 pub fn new(name: &'static str) -> Self {
98 Self { name, config: None }
99 }
100
101 pub fn with_config(
107 name: &'static str,
108 config: SubprocessBackendConfig,
109 ) -> Result<Self, crate::ExecError> {
110 config.validate()?;
111 Ok(Self {
112 name,
113 config: Some(Arc::new(config)),
114 })
115 }
116
117 fn build_task_config(
121 &self,
122 spec: &TaskSpec,
123 ctx: &BuildContext,
124 ) -> Result<(SubprocessTaskConfig, Option<NamedTempFile>), RunnerError> {
125 let slot = spec.slot();
126 let (cfg, script_tempfile) = match spec.kind() {
127 TaskKind::Subprocess(SubprocessSpec {
128 mode,
129 env,
130 cwd,
131 fail_on_non_zero,
132 }) => {
133 let Resolved {
134 command,
135 args,
136 script_tempfile,
137 } = Self::resolve_mode(mode)?;
138 let run_id = self.build_run_id(slot.as_str());
139 let cfg = SubprocessTaskConfig {
140 seq: run_id.seq(),
141 run_id: Arc::from(run_id.into_name()),
142 fail_on_non_zero: *fail_on_non_zero,
143 env: merge_env(env, ctx.env()),
144 cwd: cwd.clone(),
145 command,
146 args,
147 };
148 (cfg, script_tempfile)
149 }
150 other => {
151 return Err(RunnerError::UnsupportedKind {
152 runner: self.name,
153 kind: other.kind().to_string(),
154 });
155 }
156 };
157 cfg.validate()
158 .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
159 Ok((cfg, script_tempfile))
160 }
161
162 fn resolve_mode(mode: &solti_model::SubprocessMode) -> Result<Resolved, RunnerError> {
173 match mode {
174 solti_model::SubprocessMode::Command { command, args } => Ok(Resolved {
175 command: command.clone(),
176 args: args.clone(),
177 script_tempfile: None,
178 }),
179 solti_model::SubprocessMode::Script { runtime, args, .. } => {
180 let script = mode
181 .decode_body()
182 .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
183
184 let mut tmp = NamedTempFile::with_prefix("solti-script-").map_err(|e| {
185 RunnerError::InvalidSpec(format!("failed to create script tempfile: {e}"))
186 })?;
187
188 #[cfg(unix)]
189 {
190 use std::os::unix::fs::PermissionsExt;
191 let perms = std::fs::Permissions::from_mode(0o600);
192 if let Err(e) = tmp.as_file().set_permissions(perms) {
193 return Err(RunnerError::InvalidSpec(format!(
194 "failed to chmod 0600 script tempfile: {e}"
195 )));
196 }
197 }
198
199 tmp.write_all(script.as_bytes()).map_err(|e| {
200 RunnerError::InvalidSpec(format!("failed to write script body: {e}"))
201 })?;
202 tmp.as_file()
203 .sync_all()
204 .or_else(|_| tmp.as_file().flush())
205 .map_err(|e| {
206 RunnerError::InvalidSpec(format!("failed to flush script tempfile: {e}"))
207 })?;
208
209 let (cmd, _flag_deprecated_for_tempfile_transport) = runtime.resolve();
210 let path = tmp.path().to_string_lossy().into_owned();
211
212 let mut full_args = Vec::with_capacity(1 + args.len());
213 full_args.push(path);
214 full_args.extend(args.iter().cloned());
215
216 Ok(Resolved {
217 command: cmd.to_string(),
218 args: full_args,
219 script_tempfile: Some(tmp),
220 })
221 }
222 }
223 }
224}
225
226#[derive(Debug)]
228struct Resolved {
229 command: String,
230 args: Vec<String>,
231
232 script_tempfile: Option<NamedTempFile>,
234}
235
236impl Runner for SubprocessRunner {
237 fn name(&self) -> &'static str {
238 self.name
239 }
240
241 fn supports(&self, spec: &TaskSpec) -> bool {
242 matches!(spec.kind(), TaskKind::Subprocess(_))
243 }
244
245 fn build_task(&self, spec: &TaskSpec, ctx: &BuildContext) -> Result<TaskRef, RunnerError> {
246 let (task_cfg, script_tempfile) = self.build_task_config(spec, ctx)?;
247
248 trace!(
249 slot = %spec.slot(),
250 task = %task_cfg.run_id,
251 "building subprocess task",
252 );
253
254 let cgroup_name = self.config.as_ref().and_then(|cfg| {
255 cfg.has_cgroups().then(|| {
256 let timestamp = SystemTime::now()
257 .duration_since(UNIX_EPOCH)
258 .unwrap_or(StdDuration::from_secs(0))
259 .as_secs();
260 crate::utils::build_cgroup_name(
261 self.name,
262 spec.slot().as_str(),
263 task_cfg.seq,
264 timestamp,
265 )
266 })
267 });
268
269 let log_cfg = self
270 .config
271 .as_ref()
272 .map(|c| *c.log_config())
273 .unwrap_or_default();
274
275 let exec_ctx = Arc::new(TaskExecContext {
276 task_cfg,
277 runner_cfg: self.config.clone(),
278 cgroup_name,
279 metrics: ctx.metrics().clone(),
280 log_cfg,
281
282 _script_tempfile: script_tempfile.map(Arc::new),
283 });
284
285 let run_id = exec_ctx.task_cfg.run_id.to_string();
286 let task: TaskRef = TaskFn::arc(run_id, move |cancel: CancellationToken| {
287 let ctx = Arc::clone(&exec_ctx);
288 async move { run_subprocess(ctx, cancel).await }
289 });
290 Ok(task)
291 }
292}
293
294struct TaskExecContext {
296 task_cfg: SubprocessTaskConfig,
297 runner_cfg: Option<Arc<SubprocessBackendConfig>>,
298 cgroup_name: Option<String>,
299 metrics: solti_runner::MetricsHandle,
300 log_cfg: LogConfig,
301
302 _script_tempfile: Option<Arc<NamedTempFile>>,
303}
304
305fn build_command(ctx: &TaskExecContext) -> Command {
307 let mut cmd = Command::new(&ctx.task_cfg.command);
308 cmd.args(&ctx.task_cfg.args);
309 if let Some(cwd) = &ctx.task_cfg.cwd {
310 cmd.current_dir(cwd);
311 }
312 cmd.envs(&ctx.task_cfg.env);
313 cmd.stdout(Stdio::piped());
314 cmd.stderr(Stdio::piped());
315
316 #[cfg(unix)]
325 cmd.process_group(0);
326
327 cmd.kill_on_drop(true);
332
333 cmd
334}
335
336async fn kill_process_group(child: &mut tokio::process::Child, run_id: &str) {
342 #[cfg(unix)]
343 {
344 if let Some(pid) = child.id() {
345 let rc = unsafe { libc::kill(-(pid as i32), libc::SIGKILL) };
346 if rc != 0 {
347 let err = std::io::Error::last_os_error();
348 if err.raw_os_error() != Some(libc::ESRCH) {
349 warn!(
350 task = %run_id,
351 error = %err,
352 "killpg failed; falling back to single-pid kill",
353 );
354 let _ = child.kill().await;
355 }
356 }
357 } else {
358 }
360 }
361 #[cfg(not(unix))]
362 {
363 let _ = child.kill().await;
364 }
365}
366
367fn prepare_backend(ctx: &TaskExecContext) -> Result<(), TaskError> {
369 if let Some(backend_cfg) = &ctx.runner_cfg {
370 let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
371
372 if let Err(e) = backend_cfg.prepare_cgroups(cgroup_name_ref) {
373 ctx.metrics
374 .record_runner_error(RunnerType::Subprocess, "cgroup_prepare_failed");
375 return Err(TaskError::Fatal {
376 reason: format!("failed to prepare cgroup: {e}"),
377 exit_code: None,
378 });
379 }
380 }
381 Ok(())
382}
383
384fn apply_backend(cmd: &mut Command, ctx: &TaskExecContext) -> Result<(), TaskError> {
386 if let Some(backend_cfg) = &ctx.runner_cfg {
387 let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
388
389 if let Err(e) = backend_cfg.apply_to_command(cmd, cgroup_name_ref) {
390 ctx.metrics
391 .record_runner_error(RunnerType::Subprocess, "backend_config_failed");
392 return Err(TaskError::Fatal {
393 reason: format!("failed to apply runner config: {e}"),
394 exit_code: None,
395 });
396 }
397 }
398 Ok(())
399}
400
401fn evaluate_exit(
403 status: std::process::ExitStatus,
404 task_cfg: &SubprocessTaskConfig,
405) -> Result<(), TaskError> {
406 if !status.success() && task_cfg.fail_on_non_zero.is_enabled() {
407 let exit_code = status.code();
408 let reason = match exit_code {
409 Some(code) => format!("process exited with non-zero code: {code}"),
410 None => "process terminated by signal".into(),
411 };
412 Err(TaskError::Fail { reason, exit_code })
413 } else {
414 debug!(task = %task_cfg.run_id, "subprocess exited successfully");
415 Ok(())
416 }
417}
418
419struct CgroupGuard<'a>(Option<&'a str>);
421
422impl Drop for CgroupGuard<'_> {
423 fn drop(&mut self) {
424 if let Some(name) = self.0 {
425 crate::utils::cleanup_cgroup(name);
426 }
427 }
428}
429
430async fn run_subprocess(
432 ctx: Arc<TaskExecContext>,
433 cancel: CancellationToken,
434) -> Result<(), TaskError> {
435 ctx.metrics.record_task_started(RunnerType::Subprocess);
436 let start = Instant::now();
437
438 trace!(
439 task = %ctx.task_cfg.run_id,
440 command = %ctx.task_cfg.command,
441 args = ?ctx.task_cfg.args,
442 cwd = ?ctx.task_cfg.cwd,
443 "spawning subprocess",
444 );
445
446 prepare_backend(&ctx)?;
447
448 let _cgroup_guard = CgroupGuard(ctx.cgroup_name.as_deref());
449 let mut cmd = build_command(&ctx);
450 apply_backend(&mut cmd, &ctx)?;
451
452 let mut child = match cmd.spawn() {
453 Ok(child) => child,
454 Err(e) => {
455 ctx.metrics
456 .record_runner_error(RunnerType::Subprocess, "spawn_failed");
457 return Err(TaskError::Fatal {
458 reason: format!("spawn failed: {e}"),
459 exit_code: None,
460 });
461 }
462 };
463
464 let log_cfg = ctx.log_cfg;
465
466 let stdout = child.stdout.take().ok_or_else(|| TaskError::Fatal {
467 reason: "failed to capture stdout".into(),
468 exit_code: None,
469 })?;
470 let run_id_stdout = Arc::clone(&ctx.task_cfg.run_id);
471 let stdout_task = tokio::spawn(async move {
472 log_stream(stdout, &run_id_stdout, StreamKind::Stdout, &log_cfg).await;
473 });
474
475 let stderr = child.stderr.take().ok_or_else(|| TaskError::Fatal {
476 reason: "failed to capture stderr".into(),
477 exit_code: None,
478 })?;
479 let run_id_stderr = Arc::clone(&ctx.task_cfg.run_id);
480 let stderr_task = tokio::spawn(async move {
481 log_stream(stderr, &run_id_stderr, StreamKind::Stderr, &log_cfg).await;
482 });
483
484 let result = tokio::select! {
485 biased;
486 res = child.wait() => {
487 let status = res.map_err(|e| TaskError::Fatal {
488 reason: format!("wait failed: {e}"),
489 exit_code: None,
490 })?;
491 evaluate_exit(status, &ctx.task_cfg)
492 }
493 _ = cancel.cancelled() => {
494 debug!(
495 task = %ctx.task_cfg.run_id,
496 "cancellation requested; killing subprocess group",
497 );
498 kill_process_group(&mut child, &ctx.task_cfg.run_id).await;
499 let _ = child.wait().await;
500 Err(TaskError::Canceled)
501 }
502 };
503
504 let duration_ms = start.elapsed().as_millis() as u64;
505 let outcome = match &result {
506 Ok(()) => solti_runner::TaskOutcome::Success,
507 Err(e) => classify_task_error(e),
508 };
509 ctx.metrics
510 .record_task_completed(RunnerType::Subprocess, outcome, duration_ms);
511
512 let _ = tokio::join!(stdout_task, stderr_task);
513 result
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 fn mk_backoff() -> solti_model::BackoffPolicy {
521 solti_model::BackoffPolicy {
522 jitter: solti_model::JitterPolicy::Equal,
523 first_ms: 100,
524 max_ms: 1000,
525 factor: 2.0,
526 }
527 }
528
529 fn mk_subprocess_spec(slot: &str, command: &str) -> TaskSpec {
530 TaskSpec::builder(
531 slot,
532 TaskKind::Subprocess(SubprocessSpec {
533 mode: solti_model::SubprocessMode::Command {
534 command: command.into(),
535 args: vec![],
536 },
537 env: Default::default(),
538 cwd: None,
539 fail_on_non_zero: Default::default(),
540 }),
541 5_000u64,
542 )
543 .restart(solti_model::RestartPolicy::Never)
544 .backoff(mk_backoff())
545 .admission(solti_model::AdmissionPolicy::DropIfRunning)
546 .build()
547 .unwrap()
548 }
549
550 fn mk_embedded_spec(slot: &str) -> TaskSpec {
551 TaskSpec::builder(slot, TaskKind::Embedded, 5_000u64)
552 .restart(solti_model::RestartPolicy::Never)
553 .backoff(mk_backoff())
554 .admission(solti_model::AdmissionPolicy::DropIfRunning)
555 .build()
556 .unwrap()
557 }
558
559 fn make_task_cfg() -> SubprocessTaskConfig {
560 SubprocessTaskConfig {
561 run_id: Arc::from("test-run-1"),
562 seq: 1,
563 command: "echo".into(),
564 args: vec!["hello".into()],
565 env: Default::default(),
566 cwd: None,
567 fail_on_non_zero: solti_model::Flag::default(),
568 }
569 }
570
571 fn make_exec_ctx() -> TaskExecContext {
572 TaskExecContext {
573 task_cfg: make_task_cfg(),
574 runner_cfg: None,
575 cgroup_name: None,
576 metrics: solti_runner::noop_metrics(),
577 log_cfg: LogConfig::default(),
578 _script_tempfile: None,
579 }
580 }
581
582 #[test]
583 fn build_command_sets_args_and_pipes() {
584 let ctx = make_exec_ctx();
585 let cmd = build_command(&ctx);
586 let std_cmd = cmd.as_std();
587 assert_eq!(std_cmd.get_program(), "echo");
588 let args: Vec<_> = std_cmd.get_args().collect();
589 assert_eq!(args, vec!["hello"]);
590 }
591
592 #[test]
593 fn build_command_sets_env() {
594 let mut ctx = make_exec_ctx();
595 ctx.task_cfg.env.insert("FOO".into(), "bar".into());
596 let cmd = build_command(&ctx);
597 let envs: Vec<_> = cmd.as_std().get_envs().collect();
598 assert!(
599 envs.iter()
600 .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar")))
601 );
602 }
603
604 #[test]
605 fn evaluate_exit_success() {
606 use std::process::Command as StdCommand;
607 let status = StdCommand::new("true").status().unwrap();
608 let cfg = make_task_cfg();
609 assert!(evaluate_exit(status, &cfg).is_ok());
610 }
611
612 #[test]
613 fn evaluate_exit_non_zero_with_fail_flag() {
614 use std::process::Command as StdCommand;
615 let status = StdCommand::new("false").status().unwrap();
616 let mut cfg = make_task_cfg();
617 cfg.fail_on_non_zero = solti_model::Flag::enabled();
618 let result = evaluate_exit(status, &cfg);
619 assert!(result.is_err());
620 match result.unwrap_err() {
621 TaskError::Fail { reason, exit_code } => {
622 assert!(reason.contains("non-zero"));
623 assert_eq!(exit_code, Some(1));
624 }
625 other => panic!("expected TaskError::Fail, got {other:?}"),
626 }
627 }
628
629 #[test]
630 fn evaluate_exit_non_zero_without_fail_flag() {
631 use std::process::Command as StdCommand;
632 let status = StdCommand::new("false").status().unwrap();
633 let mut cfg = make_task_cfg();
634 cfg.fail_on_non_zero = solti_model::Flag::disabled();
635 assert!(evaluate_exit(status, &cfg).is_ok());
636 }
637
638 #[test]
639 fn build_task_returns_task_ref_for_subprocess() {
640 let runner = SubprocessRunner::new("test-runner");
641 let spec = mk_subprocess_spec("test-slot", "echo");
642 let result = runner.build_task(&spec, &BuildContext::default());
643 assert!(result.is_ok());
644 }
645
646 #[test]
647 fn build_task_rejects_non_subprocess_kind() {
648 let runner = SubprocessRunner::new("test-runner");
649 let spec = mk_embedded_spec("test-slot");
650 match runner.build_task(&spec, &BuildContext::default()) {
651 Err(RunnerError::UnsupportedKind { runner, kind }) => {
652 assert_eq!(runner, "test-runner");
653 assert_eq!(kind, "embedded");
654 }
655 Err(other) => panic!("expected UnsupportedKind, got {other:?}"),
656 Ok(_) => panic!("expected error, got Ok"),
657 }
658 }
659
660 #[test]
661 fn supports_returns_true_for_subprocess() {
662 let runner = SubprocessRunner::new("test");
663 assert!(runner.supports(&mk_subprocess_spec("s", "echo")));
664 }
665
666 #[test]
667 fn supports_returns_false_for_embedded() {
668 let runner = SubprocessRunner::new("test");
669 assert!(!runner.supports(&mk_embedded_spec("s")));
670 }
671
672 #[test]
673 fn build_task_returns_task_ref_for_script_mode() {
674 use base64::Engine;
675 use base64::engine::general_purpose::STANDARD as BASE64;
676
677 let runner = SubprocessRunner::new("test-runner");
678 let spec = TaskSpec::builder(
679 "test-slot",
680 TaskKind::Subprocess(solti_model::SubprocessSpec {
681 mode: solti_model::SubprocessMode::Script {
682 runtime: solti_model::Runtime::Bash,
683 body: BASE64.encode(b"echo hello"),
684 args: vec![],
685 },
686 env: Default::default(),
687 cwd: None,
688 fail_on_non_zero: Default::default(),
689 }),
690 5_000u64,
691 )
692 .restart(solti_model::RestartPolicy::Never)
693 .backoff(mk_backoff())
694 .admission(solti_model::AdmissionPolicy::DropIfRunning)
695 .build()
696 .unwrap();
697 let result = runner.build_task(&spec, &BuildContext::default());
698 assert!(result.is_ok());
699 }
700
701 #[test]
702 fn resolve_mode_command() {
703 let mode = solti_model::SubprocessMode::Command {
704 command: "ls".into(),
705 args: vec!["-la".into()],
706 };
707 let r = SubprocessRunner::resolve_mode(&mode).unwrap();
708 assert_eq!(r.command, "ls");
709 assert_eq!(r.args, vec!["-la"]);
710 assert!(
711 r.script_tempfile.is_none(),
712 "Command mode needs no tempfile"
713 );
714 }
715
716 #[test]
717 fn resolve_mode_script_bash_uses_tempfile() {
718 use base64::Engine;
719 use base64::engine::general_purpose::STANDARD as BASE64;
720
721 let mode = solti_model::SubprocessMode::Script {
722 runtime: solti_model::Runtime::Bash,
723 body: BASE64.encode(b"echo hello"),
724 args: vec!["extra".into()],
725 };
726 let r = SubprocessRunner::resolve_mode(&mode).unwrap();
727 assert_eq!(r.command, "bash");
728 assert_eq!(r.args.len(), 2, "args: {:?}", r.args);
729 assert_eq!(r.args[1], "extra");
730
731 let tmp = r
732 .script_tempfile
733 .expect("Script mode must produce a tempfile");
734 assert_eq!(tmp.path().to_string_lossy(), r.args[0]);
735 let written = std::fs::read_to_string(tmp.path()).expect("tempfile readable");
736 assert_eq!(written, "echo hello");
737
738 #[cfg(unix)]
739 {
740 use std::os::unix::fs::PermissionsExt;
741 let perms = std::fs::metadata(tmp.path()).unwrap().permissions();
742 assert_eq!(
743 perms.mode() & 0o777,
744 0o600,
745 "tempfile must be chmod 0600 (may carry secrets)"
746 );
747 }
748 }
749
750 #[test]
751 fn resolve_mode_script_custom_ignores_flag() {
752 use base64::Engine;
753 use base64::engine::general_purpose::STANDARD as BASE64;
754
755 let mode = solti_model::SubprocessMode::Script {
756 runtime: solti_model::Runtime::Custom {
757 command: "ruby".into(),
758 flag: "-e".into(),
759 },
760 body: BASE64.encode(b"puts 'hi'"),
761 args: vec![],
762 };
763 let r = SubprocessRunner::resolve_mode(&mode).unwrap();
764 assert_eq!(r.command, "ruby");
765 assert_eq!(r.args.len(), 1, "only the tempfile path, no flag");
766 assert!(!r.args[0].contains("-e"), "flag must not leak into args");
767 assert!(r.script_tempfile.is_some());
768 }
769
770 #[cfg(unix)]
771 #[tokio::test]
772 async fn cancel_reaps_forked_grandchildren() {
773 use std::process::Stdio;
774 use std::sync::atomic::{AtomicU32, Ordering};
775 use std::time::Duration;
776 use tokio::process::Command as TokioCommand;
777 use tokio::time::timeout;
778
779 static N: AtomicU32 = AtomicU32::new(0);
780 let marker = std::env::temp_dir().join(format!(
781 "solti-exec-pgid-test-{}-{}",
782 std::process::id(),
783 N.fetch_add(1, Ordering::SeqCst)
784 ));
785 let marker_str = marker.to_string_lossy().to_string();
786
787 let script = format!(
788 r#"
789 (sleep 60 & echo $! > {marker}) &
790 wait
791 "#,
792 marker = marker_str
793 );
794
795 let mut cmd = TokioCommand::new("bash");
796 cmd.args(["-c", &script])
797 .stdout(Stdio::null())
798 .stderr(Stdio::null());
799 cmd.process_group(0);
800 cmd.kill_on_drop(true);
801
802 let mut child = cmd.spawn().expect("bash must spawn");
803
804 let grandchild_pid: i32 = {
805 let mut attempts = 0;
806 loop {
807 if let Ok(s) = std::fs::read_to_string(&marker) {
808 if let Some(line) = s.trim().lines().next() {
809 if let Ok(pid) = line.parse::<i32>() {
810 break pid;
811 }
812 }
813 }
814 attempts += 1;
815 if attempts > 50 {
816 panic!("grandchild never reported its pid via marker");
817 }
818 tokio::time::sleep(Duration::from_millis(20)).await;
819 }
820 };
821
822 let alive = unsafe { libc::kill(grandchild_pid, 0) };
823 assert_eq!(alive, 0, "grandchild must be alive before cancel");
824
825 kill_process_group(&mut child, "test").await;
826 let _ = timeout(Duration::from_secs(2), child.wait()).await;
827
828 let mut caught = false;
829 for _ in 0..50 {
830 let rc = unsafe { libc::kill(grandchild_pid, 0) };
831 if rc != 0 && std::io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH) {
832 caught = true;
833 break;
834 }
835 tokio::time::sleep(Duration::from_millis(20)).await;
836 }
837
838 let _ = std::fs::remove_file(&marker);
839
840 if !caught {
841 unsafe { libc::kill(grandchild_pid, libc::SIGKILL) };
842 panic!(
843 "grandchild PID {} survived cancel — process-group kill did not reach it",
844 grandchild_pid
845 );
846 }
847 }
848
849 #[test]
850 fn resolve_mode_invalid_base64() {
851 let mode = solti_model::SubprocessMode::Script {
852 runtime: solti_model::Runtime::Bash,
853 body: "not-valid!!!".into(),
854 args: vec![],
855 };
856 let err = SubprocessRunner::resolve_mode(&mode).unwrap_err();
857 assert!(matches!(err, RunnerError::InvalidSpec(_)));
858 }
859
860 #[test]
861 fn resolve_mode_script_accepts_large_body() {
862 use base64::Engine;
863 use base64::engine::general_purpose::STANDARD as BASE64;
864
865 let payload: Vec<u8> = b"# "
866 .iter()
867 .copied()
868 .chain(std::iter::repeat_n(b'x', 200 * 1024))
869 .collect();
870 let mode = solti_model::SubprocessMode::Script {
871 runtime: solti_model::Runtime::Bash,
872 body: BASE64.encode(&payload),
873 args: vec![],
874 };
875 let r = SubprocessRunner::resolve_mode(&mode)
876 .expect("200 KiB script must resolve via tempfile");
877 assert_eq!(r.command, "bash");
878 assert_eq!(r.args.len(), 1);
879 let tmp = r
880 .script_tempfile
881 .expect("large Script must allocate a tempfile");
882 let written = std::fs::read(tmp.path()).unwrap();
883 assert_eq!(written.len(), payload.len());
884 }
885}