Skip to main content

solti_exec/subprocess/
runner.rs

1//! # Runner: subprocess execution engine.
2//!
3//! [`SubprocessRunner`] implements the [`Runner`](solti_runner::Runner) trait to execute
4//! [`TaskKind::Subprocess`](solti_model::TaskKind::Subprocess) tasks as OS processes.
5//!
6//! ## How it works
7//! ```text
8//! SubprocessRunner::build_task(spec, ctx)
9//!     │
10//!     ├──► build_task_config(spec, ctx)
11//!     │     ├──► match TaskKind::Subprocess(SubprocessSpec { .. })
12//!     │     ├──► resolve_mode(mode) → Resolved { command, args, script_tempfile? }
13//!     │     │     ├──► Command { command, args } → clone, tempfile = None
14//!     │     │     └──► Script { runtime, body }  → decode base64
15//!     │     │                                    → write to NamedTempFile (0600)
16//!     │     │                                    → args = [path, extra...]
17//!     │     │                                    → tempfile = Some(tmp)
18//!     │     ├──► merge_env(task_env, runner_env)→ BTreeMap
19//!     │     └──► SubprocessTaskConfig { run_id, seq, command, args, env, cwd, fail_on_non_zero }
20//!     │
21//!     ├──► prepare backend (cgroup dirs, if configured)
22//!     │
23//!     ├──► build Arc<TaskExecContext>
24//!     │     └──► { task_cfg, runner_cfg, cgroup_name, metrics, log_cfg, script_tempfile? }
25//!     │          tempfile kept alive until context drops → Drop = unlink
26//!     │
27//!     └──► return TaskFn closure → run_subprocess(ctx, cancel)
28//! ```
29//!
30//! ## Subprocess execution lifecycle
31//! ```text
32//! run_subprocess(ctx, cancel)
33//!     │
34//!     ├──► metrics.record_task_started()
35//!     ├──► prepare_backend() → create cgroup dirs
36//!     ├──► build_command() → Command with:
37//!     │      - args, env, cwd, piped stdout/stderr
38//!     │      - process_group(0)   (Unix: new pgid for kill-whole-subtree)
39//!     │      - kill_on_drop(true) (tokio SIGKILLs PID if future is dropped)
40//!     ├──► apply_backend() → install pre_exec hooks (rlimits, cgroup join, security)
41//!     ├──► cmd.spawn()
42//!     │
43//!     ├──► tokio::spawn(log_stream(stdout, Stdout))
44//!     ├──► tokio::spawn(log_stream(stderr, Stderr))
45//!     │
46//!     ├──► select! { biased; ... }
47//!     │     ├──► child.wait() → evaluate_exit(status)
48//!     │     └──► cancel.cancelled() → kill_process_group() (killpg -SIGKILL pgid)
49//!     │                             → child.wait() to reap zombie
50//!     │
51//!     ├──► metrics.record_task_completed(outcome, duration)
52//!     ├──► join!(stdout_task, stderr_task)
53//!     ├──► cleanup_cgroup() (if configured)
54//!     └──► return result
55//! ```
56
57use std::{
58    io::Write as _,
59    process::Stdio,
60    sync::{
61        Arc,
62        atomic::{AtomicU32, Ordering},
63    },
64    time::{Duration as StdDuration, Instant, SystemTime, UNIX_EPOCH},
65};
66
67use taskvisor::{TaskError, TaskFn, TaskRef};
68use tempfile::NamedTempFile;
69use tokio::process::Command;
70use tokio_util::sync::CancellationToken;
71use tracing::{debug, trace, warn};
72
73use solti_model::{SubprocessSpec, TaskId, TaskKind, TaskSpec, merge_env};
74use solti_runner::{
75    BuildContext, OutputRegistry, Runner, RunnerError, RunnerErrorKind, RunnerType,
76};
77
78use crate::metrics::classify_task_error;
79use crate::subprocess::{
80    backend::SubprocessBackendConfig,
81    logger::{LogConfig, StreamKind, log_stream},
82    task::SubprocessTaskConfig,
83};
84
85/// Runner that executes `TaskKind::Subprocess` as OS subprocesses.
86///
87/// ## Also
88///
89/// - [`SubprocessBackendConfig`] rlimits, cgroups, security applied to spawned processes.
90/// - [`SubprocessTaskConfig`](super::SubprocessTaskConfig) resolved per-task config.
91/// - [`register_subprocess_runner`](super::register_subprocess_runner) registration helper.
92/// - [`solti_runner::Runner`] trait this type implements.
93pub struct SubprocessRunner {
94    /// Runner name.
95    name: &'static str,
96    /// Backend configuration applied to all tasks spawned by this runner.
97    config: Option<Arc<SubprocessBackendConfig>>,
98}
99
100impl SubprocessRunner {
101    /// Create a new subprocess runner without backend configuration.
102    pub fn new(name: &'static str) -> Self {
103        Self { name, config: None }
104    }
105
106    /// Create a subprocess runner with explicit backend configuration.
107    ///
108    /// # Errors
109    ///
110    /// Returns `ExecError` if the configuration is invalid.
111    pub fn with_config(
112        name: &'static str,
113        config: SubprocessBackendConfig,
114    ) -> Result<Self, crate::ExecError> {
115        config.validate()?;
116        Ok(Self {
117            name,
118            config: Some(Arc::new(config)),
119        })
120    }
121
122    /// Build task configuration from `TaskSpec`.
123    ///
124    /// Returns the fully resolved config.
125    fn build_task_config(
126        &self,
127        spec: &TaskSpec,
128        ctx: &BuildContext,
129    ) -> Result<(SubprocessTaskConfig, Option<NamedTempFile>), RunnerError> {
130        let slot = spec.slot();
131        let (cfg, script_tempfile) = match spec.kind() {
132            TaskKind::Subprocess(SubprocessSpec {
133                mode,
134                env,
135                cwd,
136                fail_on_non_zero,
137            }) => {
138                let Resolved {
139                    command,
140                    args,
141                    script_tempfile,
142                } = Self::resolve_mode(mode)?;
143                let run_id = self.build_run_id(slot.as_str());
144                let cfg = SubprocessTaskConfig {
145                    seq: run_id.seq(),
146                    run_id: Arc::from(run_id.into_name()),
147                    fail_on_non_zero: *fail_on_non_zero,
148                    env: merge_env(env, ctx.env()),
149                    cwd: cwd.clone(),
150                    command,
151                    args,
152                };
153                (cfg, script_tempfile)
154            }
155            other => {
156                return Err(RunnerError::UnsupportedKind {
157                    runner: self.name,
158                    kind: other.kind().to_string(),
159                });
160            }
161        };
162        cfg.validate()
163            .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
164        Ok((cfg, script_tempfile))
165    }
166
167    /// Resolve [`SubprocessMode`](solti_model::SubprocessMode) into a command + args pair ready for `execve`.
168    ///
169    /// ## Script transport
170    ///
171    /// For `Script` mode the body is written to a `NamedTempFile` (mode 0600) and the interpreter is invoked with the file path: *not* with `-c "<inline>"`.
172    ///
173    /// ## Limits
174    ///
175    /// - The inline form is limited to `MAX_ARG_STRLEN` (128 KiB on Linux);
176    /// - The tempfile form supports scripts up to [`MAX_SCRIPT_BODY_BYTES`](solti_model::MAX_SCRIPT_BODY_BYTES) (2 MiB).
177    fn resolve_mode(mode: &solti_model::SubprocessMode) -> Result<Resolved, RunnerError> {
178        match mode {
179            solti_model::SubprocessMode::Command { command, args } => Ok(Resolved {
180                command: command.clone(),
181                args: args.clone(),
182                script_tempfile: None,
183            }),
184            solti_model::SubprocessMode::Script { runtime, args, .. } => {
185                let script = mode
186                    .decode_body()
187                    .map_err(|e| RunnerError::InvalidSpec(e.to_string()))?;
188
189                let mut tmp = NamedTempFile::with_prefix("solti-script-").map_err(|e| {
190                    RunnerError::InvalidSpec(format!("failed to create script tempfile: {e}"))
191                })?;
192
193                #[cfg(unix)]
194                {
195                    use std::os::unix::fs::PermissionsExt;
196                    let perms = std::fs::Permissions::from_mode(0o600);
197                    if let Err(e) = tmp.as_file().set_permissions(perms) {
198                        return Err(RunnerError::InvalidSpec(format!(
199                            "failed to chmod 0600 script tempfile: {e}"
200                        )));
201                    }
202                }
203
204                tmp.write_all(script.as_bytes()).map_err(|e| {
205                    RunnerError::InvalidSpec(format!("failed to write script body: {e}"))
206                })?;
207                tmp.as_file()
208                    .sync_all()
209                    .or_else(|_| tmp.as_file().flush())
210                    .map_err(|e| {
211                        RunnerError::InvalidSpec(format!("failed to flush script tempfile: {e}"))
212                    })?;
213
214                let (cmd, _flag_deprecated_for_tempfile_transport) = runtime.resolve();
215                let path = tmp.path().to_string_lossy().into_owned();
216
217                let mut full_args = Vec::with_capacity(1 + args.len());
218                full_args.push(path);
219                full_args.extend(args.iter().cloned());
220
221                Ok(Resolved {
222                    command: cmd.to_string(),
223                    args: full_args,
224                    script_tempfile: Some(tmp),
225                })
226            }
227        }
228    }
229}
230
231/// Output of [`SubprocessRunner::resolve_mode`].
232#[derive(Debug)]
233struct Resolved {
234    command: String,
235    args: Vec<String>,
236
237    /// Tempfile holding the script body for `Script` mode; `None` for `Command`.
238    script_tempfile: Option<NamedTempFile>,
239}
240
241impl Runner for SubprocessRunner {
242    fn name(&self) -> &'static str {
243        self.name
244    }
245
246    fn supports(&self, spec: &TaskSpec) -> bool {
247        matches!(spec.kind(), TaskKind::Subprocess(_))
248    }
249
250    fn build_task(&self, spec: &TaskSpec, ctx: &BuildContext) -> Result<TaskRef, RunnerError> {
251        let (task_cfg, script_tempfile) = self.build_task_config(spec, ctx)?;
252
253        trace!(
254            slot = %spec.slot(),
255            task = %task_cfg.run_id,
256            "building subprocess task",
257        );
258
259        let cgroup_name = self.config.as_ref().and_then(|cfg| {
260            cfg.has_cgroups().then(|| {
261                let timestamp = SystemTime::now()
262                    .duration_since(UNIX_EPOCH)
263                    .unwrap_or(StdDuration::from_secs(0))
264                    .as_secs();
265                crate::utils::build_cgroup_name(
266                    self.name,
267                    spec.slot().as_str(),
268                    task_cfg.seq,
269                    timestamp,
270                )
271            })
272        });
273
274        let log_cfg = self
275            .config
276            .as_ref()
277            .map(|c| *c.log_config())
278            .unwrap_or_default();
279
280        let task_id = TaskId::from(Arc::clone(&task_cfg.run_id));
281        ctx.output_registry().ensure_channel(task_id);
282
283        let exec_ctx = Arc::new(TaskExecContext {
284            task_cfg,
285            runner_cfg: self.config.clone(),
286            cgroup_name,
287            metrics: ctx.metrics().clone(),
288            log_cfg,
289            output_registry: Arc::clone(ctx.output_registry()),
290            attempt: AtomicU32::new(0),
291
292            _script_tempfile: script_tempfile.map(Arc::new),
293        });
294
295        let run_id = exec_ctx.task_cfg.run_id.to_string();
296        let task: TaskRef = TaskFn::arc(run_id, move |cancel: CancellationToken| {
297            let ctx = Arc::clone(&exec_ctx);
298            async move { run_subprocess(ctx, cancel).await }
299        });
300        Ok(task)
301    }
302}
303
304/// Shared context for subprocess task execution.
305struct TaskExecContext {
306    runner_cfg: Option<Arc<SubprocessBackendConfig>>,
307    metrics: solti_runner::MetricsHandle,
308    output_registry: Arc<OutputRegistry>,
309    task_cfg: SubprocessTaskConfig,
310    cgroup_name: Option<String>,
311    log_cfg: LogConfig,
312    attempt: AtomicU32,
313
314    _script_tempfile: Option<Arc<NamedTempFile>>,
315}
316
317/// Build the OS command from task configuration.
318fn build_command(ctx: &TaskExecContext) -> Command {
319    let mut cmd = Command::new(&ctx.task_cfg.command);
320    cmd.args(&ctx.task_cfg.args);
321    if let Some(cwd) = &ctx.task_cfg.cwd {
322        cmd.current_dir(cwd);
323    }
324    cmd.envs(&ctx.task_cfg.env);
325    cmd.stdout(Stdio::piped());
326    cmd.stderr(Stdio::piped());
327
328    // Put the child into its own process group (pgid = child's pid).
329    //
330    // Why:
331    // `child.kill()` delivers SIGKILL to the single pid only.
332    // Scripts routinely fork background children (`sleep 100 &`, `nc -l &`, any daemonized helper);
333    // without a process group those children survive the cancel and become zombies orphaned to PID 1.
334    //
335    // By spawning with `setpgid(0, 0)` via `process_group(0)` we ensure that `kill(-pgid, SIGKILL)` in the cancel path reaps the whole subtree at once.
336    #[cfg(unix)]
337    cmd.process_group(0);
338
339    // Last-ditch safety:
340    // if the `run_subprocess` future is dropped before reaching either the `wait` or the `kill` branch
341    // (panic inside log streams, supervisor panic, future aborted by `tokio::select!` sibling),
342    // tokio should kill the child on Drop rather than leaving it orphaned.
343    cmd.kill_on_drop(true);
344
345    cmd
346}
347
348/// Kill of the entire process group led by `child`.
349///
350/// On Unix: `killpg(pid, SIGKILL)` via `libc::kill(-pid, SIGKILL)`
351///
352/// On other platforms: falls back to `child.kill()` (single PID only).
353async fn kill_process_group(child: &mut tokio::process::Child, run_id: &str) {
354    #[cfg(unix)]
355    {
356        if let Some(pid) = child.id() {
357            let rc = unsafe { libc::kill(-(pid as i32), libc::SIGKILL) };
358            if rc != 0 {
359                let err = std::io::Error::last_os_error();
360                if err.raw_os_error() != Some(libc::ESRCH) {
361                    warn!(
362                        task = %run_id,
363                        error = %err,
364                        "killpg failed; falling back to single-pid kill",
365                    );
366                    let _ = child.kill().await;
367                }
368            }
369        } else {
370            // No pid means the child was already reaped - nothing to kill.
371        }
372    }
373    #[cfg(not(unix))]
374    {
375        let _ = child.kill().await;
376    }
377}
378
379/// Prepare backend resources (cgroup directories) before spawn.
380fn prepare_backend(ctx: &TaskExecContext) -> Result<(), TaskError> {
381    if let Some(backend_cfg) = &ctx.runner_cfg {
382        let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
383
384        if let Err(e) = backend_cfg.prepare_cgroups(cgroup_name_ref) {
385            ctx.metrics
386                .record_runner_error(RunnerType::Subprocess, RunnerErrorKind::CgroupPrepareFailed);
387            return Err(TaskError::Fatal {
388                reason: format!("failed to prepare cgroup: {e}"),
389                exit_code: None,
390            });
391        }
392    }
393    Ok(())
394}
395
396/// Apply backend configuration (rlimits, cgroup join, security) to the command.
397fn apply_backend(cmd: &mut Command, ctx: &TaskExecContext) -> Result<(), TaskError> {
398    if let Some(backend_cfg) = &ctx.runner_cfg {
399        let cgroup_name_ref = ctx.cgroup_name.as_deref().unwrap_or(&ctx.task_cfg.run_id);
400
401        if let Err(e) = backend_cfg.apply_to_command(cmd, cgroup_name_ref) {
402            ctx.metrics
403                .record_runner_error(RunnerType::Subprocess, RunnerErrorKind::BackendConfigFailed);
404            return Err(TaskError::Fatal {
405                reason: format!("failed to apply runner config: {e}"),
406                exit_code: None,
407            });
408        }
409    }
410    Ok(())
411}
412
413/// Evaluate subprocess exit status.
414fn evaluate_exit(
415    status: std::process::ExitStatus,
416    task_cfg: &SubprocessTaskConfig,
417) -> Result<(), TaskError> {
418    if !status.success() && task_cfg.fail_on_non_zero.is_enabled() {
419        let exit_code = status.code();
420        let reason = match exit_code {
421            Some(code) => format!("process exited with non-zero code: {code}"),
422            None => "process terminated by signal".into(),
423        };
424        Err(TaskError::Fail { reason, exit_code })
425    } else {
426        debug!(task = %task_cfg.run_id, "subprocess exited successfully");
427        Ok(())
428    }
429}
430
431/// RAII guard that calls `cleanup_cgroup` on drop.
432struct CgroupGuard<'a>(Option<&'a str>);
433
434impl Drop for CgroupGuard<'_> {
435    fn drop(&mut self) {
436        if let Some(name) = self.0 {
437            crate::utils::cleanup_cgroup(name);
438        }
439    }
440}
441
442/// Execute a subprocess task with cancellation support, metrics, and cleanup.
443async fn run_subprocess(
444    ctx: Arc<TaskExecContext>,
445    cancel: CancellationToken,
446) -> Result<(), TaskError> {
447    ctx.metrics.record_task_started(RunnerType::Subprocess);
448    let start = Instant::now();
449
450    trace!(
451        task = %ctx.task_cfg.run_id,
452        command = %ctx.task_cfg.command,
453        args = ?ctx.task_cfg.args,
454        cwd = ?ctx.task_cfg.cwd,
455        "spawning subprocess",
456    );
457
458    prepare_backend(&ctx)?;
459
460    let _cgroup_guard = CgroupGuard(ctx.cgroup_name.as_deref());
461    let mut cmd = build_command(&ctx);
462    apply_backend(&mut cmd, &ctx)?;
463
464    let mut child = match cmd.spawn() {
465        Ok(child) => child,
466        Err(e) => {
467            ctx.metrics
468                .record_runner_error(RunnerType::Subprocess, RunnerErrorKind::SpawnFailed);
469            return Err(TaskError::Fatal {
470                reason: format!("spawn failed: {e}"),
471                exit_code: None,
472            });
473        }
474    };
475
476    let log_cfg = ctx.log_cfg;
477
478    let attempt = ctx.attempt.fetch_add(1, Ordering::Relaxed) + 1;
479    let task_id = TaskId::from(Arc::clone(&ctx.task_cfg.run_id));
480    let sink = ctx.output_registry.sink_for(task_id, attempt);
481
482    let stdout = child.stdout.take().ok_or_else(|| TaskError::Fatal {
483        reason: "failed to capture stdout".into(),
484        exit_code: None,
485    })?;
486    let run_id_stdout = Arc::clone(&ctx.task_cfg.run_id);
487    let sink_stdout = sink.clone();
488    let stdout_task = tokio::spawn(async move {
489        log_stream(
490            stdout,
491            &run_id_stdout,
492            StreamKind::Stdout,
493            &log_cfg,
494            Some(&sink_stdout),
495        )
496        .await;
497    });
498
499    let stderr = child.stderr.take().ok_or_else(|| TaskError::Fatal {
500        reason: "failed to capture stderr".into(),
501        exit_code: None,
502    })?;
503    let run_id_stderr = Arc::clone(&ctx.task_cfg.run_id);
504    let sink_stderr = sink.clone();
505    let stderr_task = tokio::spawn(async move {
506        log_stream(
507            stderr,
508            &run_id_stderr,
509            StreamKind::Stderr,
510            &log_cfg,
511            Some(&sink_stderr),
512        )
513        .await;
514    });
515
516    let result = tokio::select! {
517        biased;
518        res = child.wait() => {
519            let status = res.map_err(|e| TaskError::Fatal {
520                reason: format!("wait failed: {e}"),
521                exit_code: None,
522            })?;
523            evaluate_exit(status, &ctx.task_cfg)
524        }
525        _ = cancel.cancelled() => {
526            debug!(
527                task = %ctx.task_cfg.run_id,
528                "cancellation requested; killing subprocess group",
529            );
530            kill_process_group(&mut child, &ctx.task_cfg.run_id).await;
531            let _ = child.wait().await;
532            Err(TaskError::Canceled)
533        }
534    };
535
536    let duration_ms = start.elapsed().as_millis() as u64;
537    let outcome = match &result {
538        Ok(()) => solti_runner::TaskOutcome::Success,
539        Err(e) => classify_task_error(e),
540    };
541    ctx.metrics
542        .record_task_completed(RunnerType::Subprocess, outcome, duration_ms);
543
544    let _ = tokio::join!(stdout_task, stderr_task);
545    result
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    fn mk_backoff() -> solti_model::BackoffPolicy {
553        solti_model::BackoffPolicy {
554            jitter: solti_model::JitterPolicy::Equal,
555            first_ms: 100,
556            max_ms: 1000,
557            factor: 2.0,
558        }
559    }
560
561    fn mk_subprocess_spec(slot: &str, command: &str) -> TaskSpec {
562        mk_subprocess_spec_with_args(slot, command, &[])
563    }
564
565    fn mk_subprocess_spec_with_args(slot: &str, command: &str, args: &[&str]) -> TaskSpec {
566        TaskSpec::builder(
567            slot,
568            TaskKind::Subprocess(SubprocessSpec {
569                mode: solti_model::SubprocessMode::Command {
570                    command: command.into(),
571                    args: args.iter().map(|s| s.to_string()).collect(),
572                },
573                env: Default::default(),
574                cwd: None,
575                fail_on_non_zero: Default::default(),
576            }),
577            5_000u64,
578        )
579        .restart(solti_model::RestartPolicy::Never)
580        .backoff(mk_backoff())
581        .admission(solti_model::AdmissionPolicy::DropIfRunning)
582        .build()
583        .unwrap()
584    }
585
586    fn mk_embedded_spec(slot: &str) -> TaskSpec {
587        TaskSpec::builder(slot, TaskKind::Embedded, 5_000u64)
588            .restart(solti_model::RestartPolicy::Never)
589            .backoff(mk_backoff())
590            .admission(solti_model::AdmissionPolicy::DropIfRunning)
591            .build()
592            .unwrap()
593    }
594
595    fn make_task_cfg() -> SubprocessTaskConfig {
596        SubprocessTaskConfig {
597            run_id: Arc::from("test-run-1"),
598            seq: 1,
599            command: "echo".into(),
600            args: vec!["hello".into()],
601            env: Default::default(),
602            cwd: None,
603            fail_on_non_zero: solti_model::Flag::default(),
604        }
605    }
606
607    fn make_exec_ctx() -> TaskExecContext {
608        TaskExecContext {
609            task_cfg: make_task_cfg(),
610            runner_cfg: None,
611            cgroup_name: None,
612            metrics: solti_runner::noop_metrics(),
613            log_cfg: LogConfig::default(),
614            output_registry: Arc::new(OutputRegistry::default()),
615            attempt: AtomicU32::new(0),
616            _script_tempfile: None,
617        }
618    }
619
620    #[test]
621    fn build_command_sets_args_and_pipes() {
622        let ctx = make_exec_ctx();
623        let cmd = build_command(&ctx);
624        let std_cmd = cmd.as_std();
625        assert_eq!(std_cmd.get_program(), "echo");
626        let args: Vec<_> = std_cmd.get_args().collect();
627        assert_eq!(args, vec!["hello"]);
628    }
629
630    #[test]
631    fn build_command_sets_env() {
632        let mut ctx = make_exec_ctx();
633        ctx.task_cfg.env.insert("FOO".into(), "bar".into());
634        let cmd = build_command(&ctx);
635        let envs: Vec<_> = cmd.as_std().get_envs().collect();
636        assert!(
637            envs.iter()
638                .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar")))
639        );
640    }
641
642    #[test]
643    fn evaluate_exit_success() {
644        use std::process::Command as StdCommand;
645        let status = StdCommand::new("true").status().unwrap();
646        let cfg = make_task_cfg();
647        assert!(evaluate_exit(status, &cfg).is_ok());
648    }
649
650    #[test]
651    fn evaluate_exit_non_zero_with_fail_flag() {
652        use std::process::Command as StdCommand;
653        let status = StdCommand::new("false").status().unwrap();
654        let mut cfg = make_task_cfg();
655        cfg.fail_on_non_zero = solti_model::Flag::enabled();
656        let result = evaluate_exit(status, &cfg);
657        assert!(result.is_err());
658        match result.unwrap_err() {
659            TaskError::Fail { reason, exit_code } => {
660                assert!(reason.contains("non-zero"));
661                assert_eq!(exit_code, Some(1));
662            }
663            other => panic!("expected TaskError::Fail, got {other:?}"),
664        }
665    }
666
667    #[test]
668    fn evaluate_exit_non_zero_without_fail_flag() {
669        use std::process::Command as StdCommand;
670        let status = StdCommand::new("false").status().unwrap();
671        let mut cfg = make_task_cfg();
672        cfg.fail_on_non_zero = solti_model::Flag::disabled();
673        assert!(evaluate_exit(status, &cfg).is_ok());
674    }
675
676    #[test]
677    fn build_task_returns_task_ref_for_subprocess() {
678        let runner = SubprocessRunner::new("test-runner");
679        let spec = mk_subprocess_spec("test-slot", "echo");
680        let result = runner.build_task(&spec, &BuildContext::default());
681        assert!(result.is_ok());
682    }
683
684    #[test]
685    fn build_task_rejects_non_subprocess_kind() {
686        let runner = SubprocessRunner::new("test-runner");
687        let spec = mk_embedded_spec("test-slot");
688        match runner.build_task(&spec, &BuildContext::default()) {
689            Err(RunnerError::UnsupportedKind { runner, kind }) => {
690                assert_eq!(runner, "test-runner");
691                assert_eq!(kind, "embedded");
692            }
693            Err(other) => panic!("expected UnsupportedKind, got {other:?}"),
694            Ok(_) => panic!("expected error, got Ok"),
695        }
696    }
697
698    #[test]
699    fn supports_returns_true_for_subprocess() {
700        let runner = SubprocessRunner::new("test");
701        assert!(runner.supports(&mk_subprocess_spec("s", "echo")));
702    }
703
704    #[test]
705    fn supports_returns_false_for_embedded() {
706        let runner = SubprocessRunner::new("test");
707        assert!(!runner.supports(&mk_embedded_spec("s")));
708    }
709
710    #[test]
711    fn build_task_returns_task_ref_for_script_mode() {
712        use base64::Engine;
713        use base64::engine::general_purpose::STANDARD as BASE64;
714
715        let runner = SubprocessRunner::new("test-runner");
716        let spec = TaskSpec::builder(
717            "test-slot",
718            TaskKind::Subprocess(solti_model::SubprocessSpec {
719                mode: solti_model::SubprocessMode::Script {
720                    runtime: solti_model::Runtime::Bash,
721                    body: BASE64.encode(b"echo hello"),
722                    args: vec![],
723                },
724                env: Default::default(),
725                cwd: None,
726                fail_on_non_zero: Default::default(),
727            }),
728            5_000u64,
729        )
730        .restart(solti_model::RestartPolicy::Never)
731        .backoff(mk_backoff())
732        .admission(solti_model::AdmissionPolicy::DropIfRunning)
733        .build()
734        .unwrap();
735        let result = runner.build_task(&spec, &BuildContext::default());
736        assert!(result.is_ok());
737    }
738
739    #[test]
740    fn resolve_mode_command() {
741        let mode = solti_model::SubprocessMode::Command {
742            command: "ls".into(),
743            args: vec!["-la".into()],
744        };
745        let r = SubprocessRunner::resolve_mode(&mode).unwrap();
746        assert_eq!(r.command, "ls");
747        assert_eq!(r.args, vec!["-la"]);
748        assert!(
749            r.script_tempfile.is_none(),
750            "Command mode needs no tempfile"
751        );
752    }
753
754    #[test]
755    fn resolve_mode_script_bash_uses_tempfile() {
756        use base64::Engine;
757        use base64::engine::general_purpose::STANDARD as BASE64;
758
759        let mode = solti_model::SubprocessMode::Script {
760            runtime: solti_model::Runtime::Bash,
761            body: BASE64.encode(b"echo hello"),
762            args: vec!["extra".into()],
763        };
764        let r = SubprocessRunner::resolve_mode(&mode).unwrap();
765        assert_eq!(r.command, "bash");
766        assert_eq!(r.args.len(), 2, "args: {:?}", r.args);
767        assert_eq!(r.args[1], "extra");
768
769        let tmp = r
770            .script_tempfile
771            .expect("Script mode must produce a tempfile");
772        assert_eq!(tmp.path().to_string_lossy(), r.args[0]);
773        let written = std::fs::read_to_string(tmp.path()).expect("tempfile readable");
774        assert_eq!(written, "echo hello");
775
776        #[cfg(unix)]
777        {
778            use std::os::unix::fs::PermissionsExt;
779            let perms = std::fs::metadata(tmp.path()).unwrap().permissions();
780            assert_eq!(
781                perms.mode() & 0o777,
782                0o600,
783                "tempfile must be chmod 0600 (may carry secrets)"
784            );
785        }
786    }
787
788    #[test]
789    fn resolve_mode_script_custom_ignores_flag() {
790        use base64::Engine;
791        use base64::engine::general_purpose::STANDARD as BASE64;
792
793        let mode = solti_model::SubprocessMode::Script {
794            runtime: solti_model::Runtime::Custom {
795                command: "ruby".into(),
796                flag: "-e".into(),
797            },
798            body: BASE64.encode(b"puts 'hi'"),
799            args: vec![],
800        };
801        let r = SubprocessRunner::resolve_mode(&mode).unwrap();
802        assert_eq!(r.command, "ruby");
803        assert_eq!(r.args.len(), 1, "only the tempfile path, no flag");
804        assert!(!r.args[0].contains("-e"), "flag must not leak into args");
805        assert!(r.script_tempfile.is_some());
806    }
807
808    #[cfg(unix)]
809    #[tokio::test]
810    async fn cancel_reaps_forked_grandchildren() {
811        use std::process::Stdio;
812        use std::sync::atomic::{AtomicU32, Ordering};
813        use std::time::Duration;
814        use tokio::process::Command as TokioCommand;
815        use tokio::time::timeout;
816
817        static N: AtomicU32 = AtomicU32::new(0);
818        let marker = std::env::temp_dir().join(format!(
819            "solti-exec-pgid-test-{}-{}",
820            std::process::id(),
821            N.fetch_add(1, Ordering::SeqCst)
822        ));
823        let marker_str = marker.to_string_lossy().to_string();
824
825        let script = format!(
826            r#"
827            (sleep 60 & echo $! > {marker}) &
828            wait
829            "#,
830            marker = marker_str
831        );
832
833        let mut cmd = TokioCommand::new("bash");
834        cmd.args(["-c", &script])
835            .stdout(Stdio::null())
836            .stderr(Stdio::null());
837        cmd.process_group(0);
838        cmd.kill_on_drop(true);
839
840        let mut child = cmd.spawn().expect("bash must spawn");
841
842        let grandchild_pid: i32 = {
843            let mut attempts = 0;
844            loop {
845                if let Ok(s) = std::fs::read_to_string(&marker) {
846                    if let Some(line) = s.trim().lines().next() {
847                        if let Ok(pid) = line.parse::<i32>() {
848                            break pid;
849                        }
850                    }
851                }
852                attempts += 1;
853                if attempts > 50 {
854                    panic!("grandchild never reported its pid via marker");
855                }
856                tokio::time::sleep(Duration::from_millis(20)).await;
857            }
858        };
859
860        let alive = unsafe { libc::kill(grandchild_pid, 0) };
861        assert_eq!(alive, 0, "grandchild must be alive before cancel");
862
863        kill_process_group(&mut child, "test").await;
864        let _ = timeout(Duration::from_secs(2), child.wait()).await;
865
866        let mut caught = false;
867        for _ in 0..50 {
868            let rc = unsafe { libc::kill(grandchild_pid, 0) };
869            if rc != 0 && std::io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH) {
870                caught = true;
871                break;
872            }
873            tokio::time::sleep(Duration::from_millis(20)).await;
874        }
875
876        let _ = std::fs::remove_file(&marker);
877
878        if !caught {
879            unsafe { libc::kill(grandchild_pid, libc::SIGKILL) };
880            panic!(
881                "grandchild PID {} survived cancel — process-group kill did not reach it",
882                grandchild_pid
883            );
884        }
885    }
886
887    #[test]
888    fn resolve_mode_invalid_base64() {
889        let mode = solti_model::SubprocessMode::Script {
890            runtime: solti_model::Runtime::Bash,
891            body: "not-valid!!!".into(),
892            args: vec![],
893        };
894        let err = SubprocessRunner::resolve_mode(&mode).unwrap_err();
895        assert!(matches!(err, RunnerError::InvalidSpec(_)));
896    }
897
898    #[tokio::test]
899    async fn subprocess_streams_stdout_into_output_registry() {
900        use solti_model::{OutputEvent, TaskId};
901        use solti_runner::OutputRegistry;
902        use std::sync::Arc;
903        use std::time::Duration;
904        use tokio_util::sync::CancellationToken;
905
906        let registry = Arc::new(OutputRegistry::new(64));
907        let ctx = BuildContext::default().with_output_registry(registry.clone());
908
909        let runner = SubprocessRunner::new("test-runner");
910        let spec = mk_subprocess_spec_with_args("echo-slot", "echo", &["hello-stream"]);
911        let task_ref = runner.build_task(&spec, &ctx).unwrap();
912        let task_id = TaskId::from(task_ref.name());
913
914        let mut rx = registry
915            .subscribe(&task_id)
916            .expect("registry must have channel after build_task");
917
918        let cancel = CancellationToken::new();
919        task_ref.spawn(cancel).await.expect("echo must succeed");
920
921        let mut found_line = None;
922        for _ in 0..100 {
923            if let Ok(OutputEvent::Chunk(c)) = rx.try_recv() {
924                let line_text = std::str::from_utf8(&c.line).unwrap_or_default();
925                if line_text.contains("hello-stream") {
926                    found_line = Some(c);
927                    break;
928                }
929            } else {
930                tokio::time::sleep(Duration::from_millis(10)).await;
931            }
932        }
933
934        let chunk = found_line.expect("expected to receive 'hello-stream' line");
935        assert_eq!(chunk.attempt, 1);
936        assert_eq!(chunk.stream, solti_model::StreamKind::Stdout);
937    }
938
939    #[tokio::test]
940    async fn subprocess_attempt_counter_increments_on_each_spawn() {
941        use solti_model::{OutputEvent, TaskId};
942        use solti_runner::OutputRegistry;
943        use std::sync::Arc;
944        use std::time::Duration;
945        use tokio_util::sync::CancellationToken;
946
947        let registry = Arc::new(OutputRegistry::new(64));
948        let ctx = BuildContext::default().with_output_registry(registry.clone());
949        let runner = SubprocessRunner::new("test-runner");
950        let spec = mk_subprocess_spec_with_args("attempts-slot", "echo", &["x"]);
951        let task_ref = runner.build_task(&spec, &ctx).unwrap();
952        let task_id = TaskId::from(task_ref.name());
953        let mut rx = registry.subscribe(&task_id).unwrap();
954
955        task_ref.spawn(CancellationToken::new()).await.unwrap();
956        task_ref.spawn(CancellationToken::new()).await.unwrap();
957
958        let mut attempts = std::collections::BTreeSet::new();
959        for _ in 0..200 {
960            match rx.try_recv() {
961                Ok(OutputEvent::Chunk(c)) => {
962                    attempts.insert(c.attempt);
963                }
964                Ok(_) => {}
965                Err(_) => tokio::time::sleep(Duration::from_millis(10)).await,
966            }
967        }
968        assert!(attempts.contains(&1), "attempt 1 missing: {attempts:?}");
969        assert!(attempts.contains(&2), "attempt 2 missing: {attempts:?}");
970    }
971
972    #[test]
973    fn resolve_mode_script_accepts_large_body() {
974        use base64::Engine;
975        use base64::engine::general_purpose::STANDARD as BASE64;
976
977        let payload: Vec<u8> = b"# "
978            .iter()
979            .copied()
980            .chain(std::iter::repeat_n(b'x', 200 * 1024))
981            .collect();
982        let mode = solti_model::SubprocessMode::Script {
983            runtime: solti_model::Runtime::Bash,
984            body: BASE64.encode(&payload),
985            args: vec![],
986        };
987        let r = SubprocessRunner::resolve_mode(&mode)
988            .expect("200 KiB script must resolve via tempfile");
989        assert_eq!(r.command, "bash");
990        assert_eq!(r.args.len(), 1);
991        let tmp = r
992            .script_tempfile
993            .expect("large Script must allocate a tempfile");
994        let written = std::fs::read(tmp.path()).unwrap();
995        assert_eq!(written.len(), payload.len());
996    }
997}