Skip to main content

solti_exec/subprocess/
runner.rs

1//! # Runner: subprocess execution engine.
2//!
3//! [`SubprocessRunner`] implements the [`Runner`](solti_runner::Runner) trait to execute
4//! [`TaskKind::Subprocess`](solti_model::TaskKind::Subprocess) tasks as OS processes.
5//!
6//! ## How it works
7//! ```text
8//! SubprocessRunner::build_task(spec, ctx)
9//!     │
10//!     ├──► build_task_config(spec, ctx)
11//!     │     ├──► match TaskKind::Subprocess(SubprocessSpec { .. })
12//!     │     ├──► resolve_mode(mode) → Resolved { command, args, script_tempfile? }
13//!     │     │     ├──► Command { command, args } → clone, tempfile = None
14//!     │     │     └──► Script { runtime, body }  → decode base64
15//!     │     │                                    → write to NamedTempFile (0600)
16//!     │     │                                    → args = [path, extra...]
17//!     │     │                                    → tempfile = Some(tmp)
18//!     │     ├──► merge_env(task_env, runner_env)→ BTreeMap
19//!     │     └──► SubprocessTaskConfig { run_id, seq, command, args, env, cwd, fail_on_non_zero }
20//!     │
21//!     ├──► prepare backend (cgroup dirs, if configured)
22//!     │
23//!     ├──► build Arc<TaskExecContext>
24//!     │     └──► { task_cfg, runner_cfg, cgroup_name, metrics, log_cfg, script_tempfile? }
25//!     │          tempfile kept alive until context drops → Drop = unlink
26//!     │
27//!     └──► return TaskFn closure → run_subprocess(ctx, cancel)
28//! ```
29//!
30//! ## Subprocess execution lifecycle
31//! ```text
32//! run_subprocess(ctx, cancel)
33//!     │
34//!     ├──► metrics.record_task_started()
35//!     ├──► prepare_backend() → create cgroup dirs
36//!     ├──► build_command() → Command with:
37//!     │      - args, env, cwd, piped stdout/stderr
38//!     │      - process_group(0)   (Unix: new pgid for kill-whole-subtree)
39//!     │      - kill_on_drop(true) (tokio SIGKILLs PID if future is dropped)
40//!     ├──► apply_backend() → install pre_exec hooks (rlimits, cgroup join, security)
41//!     ├──► cmd.spawn()
42//!     │
43//!     ├──► tokio::spawn(log_stream(stdout, Stdout))
44//!     ├──► tokio::spawn(log_stream(stderr, Stderr))
45//!     │
46//!     ├──► select! { biased; ... }
47//!     │     ├──► child.wait() → evaluate_exit(status)
48//!     │     └──► cancel.cancelled() → kill_process_group() (killpg -SIGKILL pgid)
49//!     │                             → child.wait() to reap zombie
50//!     │
51//!     ├──► metrics.record_task_completed(outcome, duration)
52//!     ├──► join!(stdout_task, stderr_task)
53//!     ├──► cleanup_cgroup() (if configured)
54//!     └──► return result
55//! ```
56
57use std::{
58    io::Write as _,
59    process::Stdio,
60    sync::Arc,
61    time::{Duration as StdDuration, Instant, SystemTime, UNIX_EPOCH},
62};
63
64use taskvisor::{TaskError, TaskFn, TaskRef};
65use tempfile::NamedTempFile;
66use tokio::process::Command;
67use tokio_util::sync::CancellationToken;
68use tracing::{debug, trace, warn};
69
70use solti_model::{SubprocessSpec, TaskKind, TaskSpec, merge_env};
71use solti_runner::{BuildContext, Runner, RunnerError, RunnerType};
72
73use crate::metrics::classify_task_error;
74use crate::subprocess::{
75    backend::SubprocessBackendConfig,
76    logger::{LogConfig, StreamKind, log_stream},
77    task::SubprocessTaskConfig,
78};
79
80/// Runner that executes `TaskKind::Subprocess` as OS subprocesses.
81///
82/// ## Also
83///
84/// - [`SubprocessBackendConfig`] rlimits, cgroups, security applied to spawned processes.
85/// - [`SubprocessTaskConfig`](super::SubprocessTaskConfig) resolved per-task config.
86/// - [`register_subprocess_runner`](super::register_subprocess_runner) registration helper.
87/// - [`solti_runner::Runner`] trait this type implements.
88pub struct SubprocessRunner {
89    /// Runner name.
90    name: &'static str,
91    /// Backend configuration applied to all tasks spawned by this runner.
92    config: Option<Arc<SubprocessBackendConfig>>,
93}
94
95impl SubprocessRunner {
96    /// Create a new subprocess runner without backend configuration.
97    pub fn new(name: &'static str) -> Self {
98        Self { name, config: None }
99    }
100
101    /// Create a subprocess runner with explicit backend configuration.
102    ///
103    /// # Errors
104    ///
105    /// Returns `ExecError` if the configuration is invalid.
106    pub fn with_config(
107        name: &'static str,
108        config: SubprocessBackendConfig,
109    ) -> Result<Self, crate::ExecError> {
110        config.validate()?;
111        Ok(Self {
112            name,
113            config: Some(Arc::new(config)),
114        })
115    }
116
117    /// Build task configuration from `TaskSpec`.
118    ///
119    /// Returns the fully resolved config.
120    fn build_task_config(
121        &self,
122        spec: &TaskSpec,
123        ctx: &BuildContext,
124    ) -> Result<(SubprocessTaskConfig, Option<NamedTempFile>), RunnerError> {
125        let slot = spec.slot();
126        let (cfg, script_tempfile) = match spec.kind() {
127            TaskKind::Subprocess(SubprocessSpec {
128                mode,
129                env,
130                cwd,
131                fail_on_non_zero,
132            }) => {
133                let Resolved {
134                    command,
135                    args,
136                    script_tempfile,
137                } = Self::resolve_mode(mode)?;
138                let run_id = self.build_run_id(slot.as_str());
139                let cfg = SubprocessTaskConfig {
140                    seq: run_id.seq(),
141                    run_id: Arc::from(run_id.into_name()),
142                    fail_on_non_zero: *fail_on_non_zero,
143                    env: merge_env(env, ctx.env()),
144                    cwd: cwd.clone(),
145                    command,
146                    args,
147                };
148                (cfg, script_tempfile)
149            }
150            other => {
151                return Err(RunnerError::UnsupportedKind {
152                    runner: self.name,
153                    kind: other.kind().to_string(),
154                });
155            }
156        };
157        cfg.validate()
158            .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
159        Ok((cfg, script_tempfile))
160    }
161
162    /// Resolve [`SubprocessMode`](solti_model::SubprocessMode) into a command + args pair ready for `execve`.
163    ///
164    /// ## Script transport
165    ///
166    /// For `Script` mode the body is written to a `NamedTempFile` (mode 0600) and the interpreter is invoked with the file path: *not* with `-c "<inline>"`.
167    ///
168    /// ## Limits
169    ///
170    /// - The inline form is limited to `MAX_ARG_STRLEN` (128 KiB on Linux);
171    /// - The tempfile form supports scripts up to [`MAX_SCRIPT_BODY_BYTES`](solti_model::MAX_SCRIPT_BODY_BYTES) (2 MiB).
172    fn resolve_mode(mode: &solti_model::SubprocessMode) -> Result<Resolved, RunnerError> {
173        match mode {
174            solti_model::SubprocessMode::Command { command, args } => Ok(Resolved {
175                command: command.clone(),
176                args: args.clone(),
177                script_tempfile: None,
178            }),
179            solti_model::SubprocessMode::Script { runtime, args, .. } => {
180                let script = mode
181                    .decode_body()
182                    .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
183
184                let mut tmp = NamedTempFile::with_prefix("solti-script-").map_err(|e| {
185                    RunnerError::InvalidSpec(format!("failed to create script tempfile: {e}"))
186                })?;
187
188                #[cfg(unix)]
189                {
190                    use std::os::unix::fs::PermissionsExt;
191                    let perms = std::fs::Permissions::from_mode(0o600);
192                    if let Err(e) = tmp.as_file().set_permissions(perms) {
193                        return Err(RunnerError::InvalidSpec(format!(
194                            "failed to chmod 0600 script tempfile: {e}"
195                        )));
196                    }
197                }
198
199                tmp.write_all(script.as_bytes()).map_err(|e| {
200                    RunnerError::InvalidSpec(format!("failed to write script body: {e}"))
201                })?;
202                tmp.as_file()
203                    .sync_all()
204                    .or_else(|_| tmp.as_file().flush())
205                    .map_err(|e| {
206                        RunnerError::InvalidSpec(format!("failed to flush script tempfile: {e}"))
207                    })?;
208
209                let (cmd, _flag_deprecated_for_tempfile_transport) = runtime.resolve();
210                let path = tmp.path().to_string_lossy().into_owned();
211
212                let mut full_args = Vec::with_capacity(1 + args.len());
213                full_args.push(path);
214                full_args.extend(args.iter().cloned());
215
216                Ok(Resolved {
217                    command: cmd.to_string(),
218                    args: full_args,
219                    script_tempfile: Some(tmp),
220                })
221            }
222        }
223    }
224}
225
226/// Output of [`SubprocessRunner::resolve_mode`].
227#[derive(Debug)]
228struct Resolved {
229    command: String,
230    args: Vec<String>,
231
232    /// Tempfile holding the script body for `Script` mode; `None` for `Command`.
233    script_tempfile: Option<NamedTempFile>,
234}
235
236impl Runner for SubprocessRunner {
237    fn name(&self) -> &'static str {
238        self.name
239    }
240
241    fn supports(&self, spec: &TaskSpec) -> bool {
242        matches!(spec.kind(), TaskKind::Subprocess(_))
243    }
244
245    fn build_task(&self, spec: &TaskSpec, ctx: &BuildContext) -> Result<TaskRef, RunnerError> {
246        let (task_cfg, script_tempfile) = self.build_task_config(spec, ctx)?;
247
248        trace!(
249            slot = %spec.slot(),
250            task = %task_cfg.run_id,
251            "building subprocess task",
252        );
253
254        let cgroup_name = self.config.as_ref().and_then(|cfg| {
255            cfg.has_cgroups().then(|| {
256                let timestamp = SystemTime::now()
257                    .duration_since(UNIX_EPOCH)
258                    .unwrap_or(StdDuration::from_secs(0))
259                    .as_secs();
260                crate::utils::build_cgroup_name(
261                    self.name,
262                    spec.slot().as_str(),
263                    task_cfg.seq,
264                    timestamp,
265                )
266            })
267        });
268
269        let log_cfg = self
270            .config
271            .as_ref()
272            .map(|c| *c.log_config())
273            .unwrap_or_default();
274
275        let exec_ctx = Arc::new(TaskExecContext {
276            task_cfg,
277            runner_cfg: self.config.clone(),
278            cgroup_name,
279            metrics: ctx.metrics().clone(),
280            log_cfg,
281
282            _script_tempfile: script_tempfile.map(Arc::new),
283        });
284
285        let run_id = exec_ctx.task_cfg.run_id.to_string();
286        let task: TaskRef = TaskFn::arc(run_id, move |cancel: CancellationToken| {
287            let ctx = Arc::clone(&exec_ctx);
288            async move { run_subprocess(ctx, cancel).await }
289        });
290        Ok(task)
291    }
292}
293
294/// Shared context for subprocess task execution.
295struct TaskExecContext {
296    task_cfg: SubprocessTaskConfig,
297    runner_cfg: Option<Arc<SubprocessBackendConfig>>,
298    cgroup_name: Option<String>,
299    metrics: solti_runner::MetricsHandle,
300    log_cfg: LogConfig,
301
302    _script_tempfile: Option<Arc<NamedTempFile>>,
303}
304
305/// Build the OS command from task configuration.
306fn build_command(ctx: &TaskExecContext) -> Command {
307    let mut cmd = Command::new(&ctx.task_cfg.command);
308    cmd.args(&ctx.task_cfg.args);
309    if let Some(cwd) = &ctx.task_cfg.cwd {
310        cmd.current_dir(cwd);
311    }
312    cmd.envs(&ctx.task_cfg.env);
313    cmd.stdout(Stdio::piped());
314    cmd.stderr(Stdio::piped());
315
316    // Put the child into its own process group (pgid = child's pid).
317    //
318    // Why:
319    // `child.kill()` delivers SIGKILL to the single pid only.
320    // Scripts routinely fork background children (`sleep 100 &`, `nc -l &`, any daemonized helper);
321    // without a process group those children survive the cancel and become zombies orphaned to PID 1.
322    //
323    // By spawning with `setpgid(0, 0)` via `process_group(0)` we ensure that `kill(-pgid, SIGKILL)` in the cancel path reaps the whole subtree at once.
324    #[cfg(unix)]
325    cmd.process_group(0);
326
327    // Last-ditch safety:
328    // if the `run_subprocess` future is dropped before reaching either the `wait` or the `kill` branch
329    // (panic inside log streams, supervisor panic, future aborted by `tokio::select!` sibling),
330    // tokio should kill the child on Drop rather than leaving it orphaned.
331    cmd.kill_on_drop(true);
332
333    cmd
334}
335
336/// Kill of the entire process group led by `child`.
337///
338/// On Unix: `killpg(pid, SIGKILL)` via `libc::kill(-pid, SIGKILL)`
339///
340/// On other platforms: falls back to `child.kill()` (single PID only).
341async fn kill_process_group(child: &mut tokio::process::Child, run_id: &str) {
342    #[cfg(unix)]
343    {
344        if let Some(pid) = child.id() {
345            let rc = unsafe { libc::kill(-(pid as i32), libc::SIGKILL) };
346            if rc != 0 {
347                let err = std::io::Error::last_os_error();
348                if err.raw_os_error() != Some(libc::ESRCH) {
349                    warn!(
350                        task = %run_id,
351                        error = %err,
352                        "killpg failed; falling back to single-pid kill",
353                    );
354                    let _ = child.kill().await;
355                }
356            }
357        } else {
358            // No pid means the child was already reaped - nothing to kill.
359        }
360    }
361    #[cfg(not(unix))]
362    {
363        let _ = child.kill().await;
364    }
365}
366
367/// Prepare backend resources (cgroup directories) before spawn.
368fn prepare_backend(ctx: &TaskExecContext) -> Result<(), TaskError> {
369    if let Some(backend_cfg) = &ctx.runner_cfg {
370        let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
371
372        if let Err(e) = backend_cfg.prepare_cgroups(cgroup_name_ref) {
373            ctx.metrics
374                .record_runner_error(RunnerType::Subprocess, "cgroup_prepare_failed");
375            return Err(TaskError::Fatal {
376                reason: format!("failed to prepare cgroup: {e}"),
377                exit_code: None,
378            });
379        }
380    }
381    Ok(())
382}
383
384/// Apply backend configuration (rlimits, cgroup join, security) to the command.
385fn apply_backend(cmd: &mut Command, ctx: &TaskExecContext) -> Result<(), TaskError> {
386    if let Some(backend_cfg) = &ctx.runner_cfg {
387        let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
388
389        if let Err(e) = backend_cfg.apply_to_command(cmd, cgroup_name_ref) {
390            ctx.metrics
391                .record_runner_error(RunnerType::Subprocess, "backend_config_failed");
392            return Err(TaskError::Fatal {
393                reason: format!("failed to apply runner config: {e}"),
394                exit_code: None,
395            });
396        }
397    }
398    Ok(())
399}
400
401/// Evaluate subprocess exit status.
402fn evaluate_exit(
403    status: std::process::ExitStatus,
404    task_cfg: &SubprocessTaskConfig,
405) -> Result<(), TaskError> {
406    if !status.success() && task_cfg.fail_on_non_zero.is_enabled() {
407        let exit_code = status.code();
408        let reason = match exit_code {
409            Some(code) => format!("process exited with non-zero code: {code}"),
410            None => "process terminated by signal".into(),
411        };
412        Err(TaskError::Fail { reason, exit_code })
413    } else {
414        debug!(task = %task_cfg.run_id, "subprocess exited successfully");
415        Ok(())
416    }
417}
418
419/// RAII guard that calls `cleanup_cgroup` on drop.
420struct CgroupGuard<'a>(Option<&'a str>);
421
422impl Drop for CgroupGuard<'_> {
423    fn drop(&mut self) {
424        if let Some(name) = self.0 {
425            crate::utils::cleanup_cgroup(name);
426        }
427    }
428}
429
430/// Execute a subprocess task with cancellation support, metrics, and cleanup.
431async fn run_subprocess(
432    ctx: Arc<TaskExecContext>,
433    cancel: CancellationToken,
434) -> Result<(), TaskError> {
435    ctx.metrics.record_task_started(RunnerType::Subprocess);
436    let start = Instant::now();
437
438    trace!(
439        task = %ctx.task_cfg.run_id,
440        command = %ctx.task_cfg.command,
441        args = ?ctx.task_cfg.args,
442        cwd = ?ctx.task_cfg.cwd,
443        "spawning subprocess",
444    );
445
446    prepare_backend(&ctx)?;
447
448    let _cgroup_guard = CgroupGuard(ctx.cgroup_name.as_deref());
449    let mut cmd = build_command(&ctx);
450    apply_backend(&mut cmd, &ctx)?;
451
452    let mut child = match cmd.spawn() {
453        Ok(child) => child,
454        Err(e) => {
455            ctx.metrics
456                .record_runner_error(RunnerType::Subprocess, "spawn_failed");
457            return Err(TaskError::Fatal {
458                reason: format!("spawn failed: {e}"),
459                exit_code: None,
460            });
461        }
462    };
463
464    let log_cfg = ctx.log_cfg;
465
466    let stdout = child.stdout.take().ok_or_else(|| TaskError::Fatal {
467        reason: "failed to capture stdout".into(),
468        exit_code: None,
469    })?;
470    let run_id_stdout = Arc::clone(&ctx.task_cfg.run_id);
471    let stdout_task = tokio::spawn(async move {
472        log_stream(stdout, &run_id_stdout, StreamKind::Stdout, &log_cfg).await;
473    });
474
475    let stderr = child.stderr.take().ok_or_else(|| TaskError::Fatal {
476        reason: "failed to capture stderr".into(),
477        exit_code: None,
478    })?;
479    let run_id_stderr = Arc::clone(&ctx.task_cfg.run_id);
480    let stderr_task = tokio::spawn(async move {
481        log_stream(stderr, &run_id_stderr, StreamKind::Stderr, &log_cfg).await;
482    });
483
484    let result = tokio::select! {
485        biased;
486        res = child.wait() => {
487            let status = res.map_err(|e| TaskError::Fatal {
488                reason: format!("wait failed: {e}"),
489                exit_code: None,
490            })?;
491            evaluate_exit(status, &ctx.task_cfg)
492        }
493        _ = cancel.cancelled() => {
494            debug!(
495                task = %ctx.task_cfg.run_id,
496                "cancellation requested; killing subprocess group",
497            );
498            kill_process_group(&mut child, &ctx.task_cfg.run_id).await;
499            let _ = child.wait().await;
500            Err(TaskError::Canceled)
501        }
502    };
503
504    let duration_ms = start.elapsed().as_millis() as u64;
505    let outcome = match &result {
506        Ok(()) => solti_runner::TaskOutcome::Success,
507        Err(e) => classify_task_error(e),
508    };
509    ctx.metrics
510        .record_task_completed(RunnerType::Subprocess, outcome, duration_ms);
511
512    let _ = tokio::join!(stdout_task, stderr_task);
513    result
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    fn mk_backoff() -> solti_model::BackoffPolicy {
521        solti_model::BackoffPolicy {
522            jitter: solti_model::JitterPolicy::Equal,
523            first_ms: 100,
524            max_ms: 1000,
525            factor: 2.0,
526        }
527    }
528
529    fn mk_subprocess_spec(slot: &str, command: &str) -> TaskSpec {
530        TaskSpec::builder(
531            slot,
532            TaskKind::Subprocess(SubprocessSpec {
533                mode: solti_model::SubprocessMode::Command {
534                    command: command.into(),
535                    args: vec![],
536                },
537                env: Default::default(),
538                cwd: None,
539                fail_on_non_zero: Default::default(),
540            }),
541            5_000u64,
542        )
543        .restart(solti_model::RestartPolicy::Never)
544        .backoff(mk_backoff())
545        .admission(solti_model::AdmissionPolicy::DropIfRunning)
546        .build()
547        .unwrap()
548    }
549
550    fn mk_embedded_spec(slot: &str) -> TaskSpec {
551        TaskSpec::builder(slot, TaskKind::Embedded, 5_000u64)
552            .restart(solti_model::RestartPolicy::Never)
553            .backoff(mk_backoff())
554            .admission(solti_model::AdmissionPolicy::DropIfRunning)
555            .build()
556            .unwrap()
557    }
558
559    fn make_task_cfg() -> SubprocessTaskConfig {
560        SubprocessTaskConfig {
561            run_id: Arc::from("test-run-1"),
562            seq: 1,
563            command: "echo".into(),
564            args: vec!["hello".into()],
565            env: Default::default(),
566            cwd: None,
567            fail_on_non_zero: solti_model::Flag::default(),
568        }
569    }
570
571    fn make_exec_ctx() -> TaskExecContext {
572        TaskExecContext {
573            task_cfg: make_task_cfg(),
574            runner_cfg: None,
575            cgroup_name: None,
576            metrics: solti_runner::noop_metrics(),
577            log_cfg: LogConfig::default(),
578            _script_tempfile: None,
579        }
580    }
581
582    #[test]
583    fn build_command_sets_args_and_pipes() {
584        let ctx = make_exec_ctx();
585        let cmd = build_command(&ctx);
586        let std_cmd = cmd.as_std();
587        assert_eq!(std_cmd.get_program(), "echo");
588        let args: Vec<_> = std_cmd.get_args().collect();
589        assert_eq!(args, vec!["hello"]);
590    }
591
592    #[test]
593    fn build_command_sets_env() {
594        let mut ctx = make_exec_ctx();
595        ctx.task_cfg.env.insert("FOO".into(), "bar".into());
596        let cmd = build_command(&ctx);
597        let envs: Vec<_> = cmd.as_std().get_envs().collect();
598        assert!(
599            envs.iter()
600                .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar")))
601        );
602    }
603
604    #[test]
605    fn evaluate_exit_success() {
606        use std::process::Command as StdCommand;
607        let status = StdCommand::new("true").status().unwrap();
608        let cfg = make_task_cfg();
609        assert!(evaluate_exit(status, &cfg).is_ok());
610    }
611
612    #[test]
613    fn evaluate_exit_non_zero_with_fail_flag() {
614        use std::process::Command as StdCommand;
615        let status = StdCommand::new("false").status().unwrap();
616        let mut cfg = make_task_cfg();
617        cfg.fail_on_non_zero = solti_model::Flag::enabled();
618        let result = evaluate_exit(status, &cfg);
619        assert!(result.is_err());
620        match result.unwrap_err() {
621            TaskError::Fail { reason, exit_code } => {
622                assert!(reason.contains("non-zero"));
623                assert_eq!(exit_code, Some(1));
624            }
625            other => panic!("expected TaskError::Fail, got {other:?}"),
626        }
627    }
628
629    #[test]
630    fn evaluate_exit_non_zero_without_fail_flag() {
631        use std::process::Command as StdCommand;
632        let status = StdCommand::new("false").status().unwrap();
633        let mut cfg = make_task_cfg();
634        cfg.fail_on_non_zero = solti_model::Flag::disabled();
635        assert!(evaluate_exit(status, &cfg).is_ok());
636    }
637
638    #[test]
639    fn build_task_returns_task_ref_for_subprocess() {
640        let runner = SubprocessRunner::new("test-runner");
641        let spec = mk_subprocess_spec("test-slot", "echo");
642        let result = runner.build_task(&spec, &BuildContext::default());
643        assert!(result.is_ok());
644    }
645
646    #[test]
647    fn build_task_rejects_non_subprocess_kind() {
648        let runner = SubprocessRunner::new("test-runner");
649        let spec = mk_embedded_spec("test-slot");
650        match runner.build_task(&spec, &BuildContext::default()) {
651            Err(RunnerError::UnsupportedKind { runner, kind }) => {
652                assert_eq!(runner, "test-runner");
653                assert_eq!(kind, "embedded");
654            }
655            Err(other) => panic!("expected UnsupportedKind, got {other:?}"),
656            Ok(_) => panic!("expected error, got Ok"),
657        }
658    }
659
660    #[test]
661    fn supports_returns_true_for_subprocess() {
662        let runner = SubprocessRunner::new("test");
663        assert!(runner.supports(&mk_subprocess_spec("s", "echo")));
664    }
665
666    #[test]
667    fn supports_returns_false_for_embedded() {
668        let runner = SubprocessRunner::new("test");
669        assert!(!runner.supports(&mk_embedded_spec("s")));
670    }
671
672    #[test]
673    fn build_task_returns_task_ref_for_script_mode() {
674        use base64::Engine;
675        use base64::engine::general_purpose::STANDARD as BASE64;
676
677        let runner = SubprocessRunner::new("test-runner");
678        let spec = TaskSpec::builder(
679            "test-slot",
680            TaskKind::Subprocess(solti_model::SubprocessSpec {
681                mode: solti_model::SubprocessMode::Script {
682                    runtime: solti_model::Runtime::Bash,
683                    body: BASE64.encode(b"echo hello"),
684                    args: vec![],
685                },
686                env: Default::default(),
687                cwd: None,
688                fail_on_non_zero: Default::default(),
689            }),
690            5_000u64,
691        )
692        .restart(solti_model::RestartPolicy::Never)
693        .backoff(mk_backoff())
694        .admission(solti_model::AdmissionPolicy::DropIfRunning)
695        .build()
696        .unwrap();
697        let result = runner.build_task(&spec, &BuildContext::default());
698        assert!(result.is_ok());
699    }
700
701    #[test]
702    fn resolve_mode_command() {
703        let mode = solti_model::SubprocessMode::Command {
704            command: "ls".into(),
705            args: vec!["-la".into()],
706        };
707        let r = SubprocessRunner::resolve_mode(&mode).unwrap();
708        assert_eq!(r.command, "ls");
709        assert_eq!(r.args, vec!["-la"]);
710        assert!(
711            r.script_tempfile.is_none(),
712            "Command mode needs no tempfile"
713        );
714    }
715
716    #[test]
717    fn resolve_mode_script_bash_uses_tempfile() {
718        use base64::Engine;
719        use base64::engine::general_purpose::STANDARD as BASE64;
720
721        let mode = solti_model::SubprocessMode::Script {
722            runtime: solti_model::Runtime::Bash,
723            body: BASE64.encode(b"echo hello"),
724            args: vec!["extra".into()],
725        };
726        let r = SubprocessRunner::resolve_mode(&mode).unwrap();
727        assert_eq!(r.command, "bash");
728        assert_eq!(r.args.len(), 2, "args: {:?}", r.args);
729        assert_eq!(r.args[1], "extra");
730
731        let tmp = r
732            .script_tempfile
733            .expect("Script mode must produce a tempfile");
734        assert_eq!(tmp.path().to_string_lossy(), r.args[0]);
735        let written = std::fs::read_to_string(tmp.path()).expect("tempfile readable");
736        assert_eq!(written, "echo hello");
737
738        #[cfg(unix)]
739        {
740            use std::os::unix::fs::PermissionsExt;
741            let perms = std::fs::metadata(tmp.path()).unwrap().permissions();
742            assert_eq!(
743                perms.mode() & 0o777,
744                0o600,
745                "tempfile must be chmod 0600 (may carry secrets)"
746            );
747        }
748    }
749
750    #[test]
751    fn resolve_mode_script_custom_ignores_flag() {
752        use base64::Engine;
753        use base64::engine::general_purpose::STANDARD as BASE64;
754
755        let mode = solti_model::SubprocessMode::Script {
756            runtime: solti_model::Runtime::Custom {
757                command: "ruby".into(),
758                flag: "-e".into(),
759            },
760            body: BASE64.encode(b"puts 'hi'"),
761            args: vec![],
762        };
763        let r = SubprocessRunner::resolve_mode(&mode).unwrap();
764        assert_eq!(r.command, "ruby");
765        assert_eq!(r.args.len(), 1, "only the tempfile path, no flag");
766        assert!(!r.args[0].contains("-e"), "flag must not leak into args");
767        assert!(r.script_tempfile.is_some());
768    }
769
770    #[cfg(unix)]
771    #[tokio::test]
772    async fn cancel_reaps_forked_grandchildren() {
773        use std::process::Stdio;
774        use std::sync::atomic::{AtomicU32, Ordering};
775        use std::time::Duration;
776        use tokio::process::Command as TokioCommand;
777        use tokio::time::timeout;
778
779        static N: AtomicU32 = AtomicU32::new(0);
780        let marker = std::env::temp_dir().join(format!(
781            "solti-exec-pgid-test-{}-{}",
782            std::process::id(),
783            N.fetch_add(1, Ordering::SeqCst)
784        ));
785        let marker_str = marker.to_string_lossy().to_string();
786
787        let script = format!(
788            r#"
789            (sleep 60 & echo $! > {marker}) &
790            wait
791            "#,
792            marker = marker_str
793        );
794
795        let mut cmd = TokioCommand::new("bash");
796        cmd.args(["-c", &script])
797            .stdout(Stdio::null())
798            .stderr(Stdio::null());
799        cmd.process_group(0);
800        cmd.kill_on_drop(true);
801
802        let mut child = cmd.spawn().expect("bash must spawn");
803
804        let grandchild_pid: i32 = {
805            let mut attempts = 0;
806            loop {
807                if let Ok(s) = std::fs::read_to_string(&marker) {
808                    if let Some(line) = s.trim().lines().next() {
809                        if let Ok(pid) = line.parse::<i32>() {
810                            break pid;
811                        }
812                    }
813                }
814                attempts += 1;
815                if attempts > 50 {
816                    panic!("grandchild never reported its pid via marker");
817                }
818                tokio::time::sleep(Duration::from_millis(20)).await;
819            }
820        };
821
822        let alive = unsafe { libc::kill(grandchild_pid, 0) };
823        assert_eq!(alive, 0, "grandchild must be alive before cancel");
824
825        kill_process_group(&mut child, "test").await;
826        let _ = timeout(Duration::from_secs(2), child.wait()).await;
827
828        let mut caught = false;
829        for _ in 0..50 {
830            let rc = unsafe { libc::kill(grandchild_pid, 0) };
831            if rc != 0 && std::io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH) {
832                caught = true;
833                break;
834            }
835            tokio::time::sleep(Duration::from_millis(20)).await;
836        }
837
838        let _ = std::fs::remove_file(&marker);
839
840        if !caught {
841            unsafe { libc::kill(grandchild_pid, libc::SIGKILL) };
842            panic!(
843                "grandchild PID {} survived cancel — process-group kill did not reach it",
844                grandchild_pid
845            );
846        }
847    }
848
849    #[test]
850    fn resolve_mode_invalid_base64() {
851        let mode = solti_model::SubprocessMode::Script {
852            runtime: solti_model::Runtime::Bash,
853            body: "not-valid!!!".into(),
854            args: vec![],
855        };
856        let err = SubprocessRunner::resolve_mode(&mode).unwrap_err();
857        assert!(matches!(err, RunnerError::InvalidSpec(_)));
858    }
859
860    #[test]
861    fn resolve_mode_script_accepts_large_body() {
862        use base64::Engine;
863        use base64::engine::general_purpose::STANDARD as BASE64;
864
865        let payload: Vec<u8> = b"# "
866            .iter()
867            .copied()
868            .chain(std::iter::repeat_n(b'x', 200 * 1024))
869            .collect();
870        let mode = solti_model::SubprocessMode::Script {
871            runtime: solti_model::Runtime::Bash,
872            body: BASE64.encode(&payload),
873            args: vec![],
874        };
875        let r = SubprocessRunner::resolve_mode(&mode)
876            .expect("200 KiB script must resolve via tempfile");
877        assert_eq!(r.command, "bash");
878        assert_eq!(r.args.len(), 1);
879        let tmp = r
880            .script_tempfile
881            .expect("large Script must allocate a tempfile");
882        let written = std::fs::read(tmp.path()).unwrap();
883        assert_eq!(written.len(), payload.len());
884    }
885}