Skip to main content

mii_http/
exec.rs

1//! Exec runtime: render the parsed Exec mini-language into shell text and run
2//! it through `/bin/sh`.
3//!
4//! The pipeline AST itself is produced by [`crate::parse::exec`]; this module
5//! is concerned with turning typed request values into shell-safe words.
6//!
7//! Semantics:
8//!
9//! - A [`ValueRef`] is resolved against an [`ExecContext`] (query/path/header/
10//!   var maps and a [`BodyValue`]).
11//! - A `Text` token is always emitted as one argv element. Missing
12//!   interpolations render as the empty string.
13//! - A `[..]` `Group` token is used for shell words that contain interpolation.
14//!   It is emitted only if every interpolation resolves; otherwise the whole
15//!   group is omitted.
16//! - Request values interpolated into shell text are single-quoted. Literal
17//!   shell syntax written by the spec author remains literal shell syntax.
18//! - A binary body used outside stdin is written to a temp file and the path is
19//!   interpolated as a quoted shell word.
20//! - Pipeline stages are wired stdin → stdout. A bare `Source` stage
21//!   (`$ | cmd`) feeds a value as stdin to the next command.
22
23use crate::spec::{ExecStage, ExecToken, TextPart, ValueRef};
24use bytes::Bytes;
25use std::collections::BTreeMap;
26use std::io::Write;
27use std::process::Stdio;
28use tempfile::NamedTempFile;
29use tokio::io::AsyncWriteExt;
30use tokio::process::Command;
31
32// ---------- Context ----------
33
34#[derive(Clone, Debug, Default)]
35pub enum BodyValue {
36    #[default]
37    None,
38    Text(String),
39    Json(serde_json::Value),
40    Form(BTreeMap<String, String>),
41    Binary(Bytes),
42}
43
44#[derive(Clone, Debug, Default)]
45pub struct ExecContext {
46    pub query: BTreeMap<String, String>,
47    pub path: BTreeMap<String, String>,
48    pub headers: BTreeMap<String, String>,
49    pub vars: BTreeMap<String, String>,
50    pub body: BodyValue,
51}
52
53impl ExecContext {
54    fn resolve_text(&self, r: &ValueRef) -> Option<String> {
55        match r {
56            ValueRef::Query(n) => self.query.get(n).cloned(),
57            ValueRef::Path(n) => self.path.get(n).cloned(),
58            ValueRef::Header(n) => self.headers.get(n).cloned(),
59            ValueRef::Var(n) => self.vars.get(n).cloned(),
60            ValueRef::Body { path } => match &self.body {
61                BodyValue::None => None,
62                BodyValue::Text(s) if path.is_empty() => Some(s.clone()),
63                BodyValue::Text(_) => None,
64                BodyValue::Json(v) => {
65                    let mut cur = v;
66                    for p in path {
67                        cur = cur.get(p)?;
68                    }
69                    Some(json_to_text(cur))
70                }
71                BodyValue::Form(m) => {
72                    if path.is_empty() {
73                        Some(form_to_text(m))
74                    } else if path.len() == 1 {
75                        m.get(&path[0]).cloned()
76                    } else {
77                        None
78                    }
79                }
80                BodyValue::Binary(_) => None,
81            },
82        }
83    }
84
85    fn resolve_bytes(&self, r: &ValueRef) -> Option<Vec<u8>> {
86        if let ValueRef::Body { path } = r
87            && path.is_empty()
88            && let BodyValue::Binary(b) = &self.body
89        {
90            return Some(b.to_vec());
91        }
92        self.resolve_text(r).map(|s| s.into_bytes())
93    }
94}
95
96fn json_to_text(v: &serde_json::Value) -> String {
97    match v {
98        serde_json::Value::String(s) => s.clone(),
99        other => other.to_string(),
100    }
101}
102
103fn form_to_text(m: &BTreeMap<String, String>) -> String {
104    let pairs: Vec<String> = m.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
105    pairs.join("&")
106}
107
108// ---------- argv assembly ----------
109
110fn render_text_parts(parts: &[TextPart], ctx: &ExecContext) -> (String, bool) {
111    let mut out = String::new();
112    let mut all_present = true;
113    for p in parts {
114        match p {
115            TextPart::Literal(s) => out.push_str(s),
116            TextPart::Interp(r) => match ctx.resolve_text(r) {
117                Some(s) => out.push_str(&s),
118                None => all_present = false,
119            },
120        }
121    }
122    (out, all_present)
123}
124
125pub fn build_argv(tokens: &[ExecToken], ctx: &ExecContext) -> Vec<String> {
126    tracing::debug!(tokens = tokens.len(), "exec::build_argv");
127    let mut argv = Vec::new();
128    for t in tokens {
129        match t {
130            ExecToken::Text { parts, .. } => {
131                let (s, _) = render_text_parts(parts, ctx);
132                argv.push(s);
133            }
134            ExecToken::Group { pieces, .. } => {
135                let mut piece_strs = Vec::with_capacity(pieces.len());
136                let mut all_present = true;
137                for piece in pieces {
138                    let (s, present) = render_text_parts(&piece.parts, ctx);
139                    if !present {
140                        all_present = false;
141                        break;
142                    }
143                    piece_strs.push(s);
144                }
145                if all_present {
146                    argv.extend(piece_strs);
147                }
148            }
149        }
150    }
151    argv
152}
153
154// ---------- pipeline execution ----------
155
156#[derive(Debug)]
157pub struct ExecOutput {
158    pub status: i32,
159    pub stdout: Vec<u8>,
160    pub stderr: Vec<u8>,
161}
162
163/// Render a human-readable, non-executing preview of `pipeline` against `ctx`.
164/// Used by `--dry-run` to log the commands that would have run.
165pub fn preview_pipeline(pipeline: &[ExecStage], ctx: &ExecContext) -> Vec<String> {
166    tracing::debug!(stages = pipeline.len(), "exec::preview_pipeline");
167    match render_shell_with_mode(pipeline, ctx, false) {
168        Ok(rendered) => vec![format!("shell: {}", rendered.script)],
169        Err(err) => vec![format!("shell: <unresolved: {}>", err)],
170    }
171}
172
173pub async fn run_pipeline(
174    pipeline: &[ExecStage],
175    ctx: &ExecContext,
176    timeout: Option<std::time::Duration>,
177) -> Result<ExecOutput, String> {
178    tracing::debug!(stages = pipeline.len(), ?timeout, "exec::run_pipeline");
179    if pipeline.is_empty() {
180        return Err("empty exec pipeline".into());
181    }
182    let rendered = render_shell(pipeline, ctx)?;
183    run_shell(rendered, timeout).await
184}
185
186struct RenderedShell {
187    script: String,
188    stdin: Option<Vec<u8>>,
189    _temp_files: Vec<NamedTempFile>,
190}
191
192struct ShellRenderer<'a> {
193    ctx: &'a ExecContext,
194    temp_files: Vec<NamedTempFile>,
195    materialize_binary: bool,
196}
197
198impl<'a> ShellRenderer<'a> {
199    fn new(ctx: &'a ExecContext, materialize_binary: bool) -> Self {
200        Self {
201            ctx,
202            temp_files: Vec::new(),
203            materialize_binary,
204        }
205    }
206
207    fn resolve_shell_text(&mut self, r: &ValueRef) -> Result<Option<String>, String> {
208        if let ValueRef::Body { path } = r
209            && path.is_empty()
210            && let BodyValue::Binary(bytes) = &self.ctx.body
211        {
212            if !self.materialize_binary {
213                return Ok(Some("<binary temp file>".into()));
214            }
215            let mut file = NamedTempFile::new().map_err(|e| e.to_string())?;
216            file.write_all(bytes).map_err(|e| e.to_string())?;
217            let path = file.path().to_string_lossy().to_string();
218            self.temp_files.push(file);
219            return Ok(Some(path));
220        }
221        Ok(self.ctx.resolve_text(r))
222    }
223
224    fn render_command(&mut self, tokens: &[ExecToken]) -> Result<String, String> {
225        let mut words = Vec::new();
226        for token in tokens {
227            match token {
228                ExecToken::Text {
229                    parts, force_quote, ..
230                } => {
231                    words.push(self.render_text_word(parts, *force_quote, false)?);
232                }
233                ExecToken::Group { pieces, .. } => {
234                    let mut group_words = Vec::with_capacity(pieces.len());
235                    let mut all_present = true;
236                    for piece in pieces {
237                        match self.render_optional_word(&piece.parts, piece.force_quote)? {
238                            Some(word) => group_words.push(word),
239                            None => {
240                                all_present = false;
241                                break;
242                            }
243                        }
244                    }
245                    if all_present {
246                        words.extend(group_words);
247                    }
248                }
249            }
250        }
251        if words.is_empty() {
252            return Err("command stage produced empty shell command".into());
253        }
254        Ok(words.join(" "))
255    }
256
257    fn render_text_word(
258        &mut self,
259        parts: &[TextPart],
260        force_quote: bool,
261        omit_missing: bool,
262    ) -> Result<String, String> {
263        let mut out = String::new();
264        let mut has_interp = false;
265        for part in parts {
266            match part {
267                TextPart::Literal(s) => out.push_str(s),
268                TextPart::Interp(r) => {
269                    has_interp = true;
270                    match self.resolve_shell_text(r)? {
271                        Some(value) => out.push_str(&value),
272                        None if omit_missing => return Ok(String::new()),
273                        None => {}
274                    }
275                }
276            }
277        }
278        Ok(shell_word(&out, force_quote || has_interp))
279    }
280
281    fn render_optional_word(
282        &mut self,
283        parts: &[TextPart],
284        force_quote: bool,
285    ) -> Result<Option<String>, String> {
286        let mut out = String::new();
287        let mut has_interp = false;
288        for part in parts {
289            match part {
290                TextPart::Literal(s) => out.push_str(s),
291                TextPart::Interp(r) => {
292                    has_interp = true;
293                    let Some(value) = self.resolve_shell_text(r)? else {
294                        return Ok(None);
295                    };
296                    out.push_str(&value);
297                }
298            }
299        }
300        Ok(Some(shell_word(&out, force_quote || has_interp)))
301    }
302}
303
304fn render_shell(pipeline: &[ExecStage], ctx: &ExecContext) -> Result<RenderedShell, String> {
305    render_shell_with_mode(pipeline, ctx, true)
306}
307
308fn render_shell_with_mode(
309    pipeline: &[ExecStage],
310    ctx: &ExecContext,
311    materialize_binary: bool,
312) -> Result<RenderedShell, String> {
313    let mut pending_stdin: Option<Vec<u8>> = None;
314    let mut commands = Vec::new();
315    let mut saw_command = false;
316    let mut renderer = ShellRenderer::new(ctx, materialize_binary);
317
318    for stage in pipeline {
319        match stage {
320            ExecStage::Source { reference, .. } => {
321                if saw_command {
322                    return Err(
323                        "value-reference source after a command stage is not supported".into(),
324                    );
325                }
326                let bytes = ctx
327                    .resolve_bytes(reference)
328                    .ok_or_else(|| format!("unresolved {}", reference.describe()))?;
329                pending_stdin = Some(bytes);
330            }
331            ExecStage::Command { tokens, .. } => {
332                saw_command = true;
333                commands.push(renderer.render_command(tokens)?);
334            }
335        }
336    }
337    if commands.is_empty() {
338        return Err("pipeline ended without a command".to_string());
339    }
340    Ok(RenderedShell {
341        script: commands.join(" | "),
342        stdin: pending_stdin,
343        _temp_files: renderer.temp_files,
344    })
345}
346
347async fn run_shell(
348    rendered: RenderedShell,
349    timeout: Option<std::time::Duration>,
350) -> Result<ExecOutput, String> {
351    tracing::debug!(script = %rendered.script, "exec::run_shell: spawning");
352    let mut cmd = Command::new("/bin/sh");
353    cmd.arg("-c").arg(&rendered.script);
354    cmd.kill_on_drop(true);
355    cmd.process_group(0);
356    cmd.stdout(Stdio::piped());
357    cmd.stderr(Stdio::piped());
358    if rendered.stdin.is_some() {
359        cmd.stdin(Stdio::piped());
360    } else {
361        cmd.stdin(Stdio::null());
362    }
363
364    let mut child = cmd
365        .spawn()
366        .map_err(|e| format!("failed to spawn shell: {}", e))?;
367    let child_id = child.id();
368    if let Some(stdin) = rendered.stdin
369        && let Some(mut sin) = child.stdin.take()
370    {
371        sin.write_all(&stdin).await.map_err(|e| e.to_string())?;
372        drop(sin);
373    }
374
375    let stdout = child.stdout.take();
376    let stderr = child.stderr.take();
377    let stdout_handle = tokio::spawn(async move {
378        let mut buf = Vec::new();
379        if let Some(mut s) = stdout {
380            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
381                .await
382                .ok();
383        }
384        buf
385    });
386    let stderr_handle = tokio::spawn(async move {
387        let mut buf = Vec::new();
388        if let Some(mut s) = stderr {
389            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
390                .await
391                .ok();
392        }
393        buf
394    });
395
396    let status = if let Some(timeout) = timeout {
397        tokio::select! {
398            status = child.wait() => status.map_err(|e| e.to_string())?,
399            _ = tokio::time::sleep(timeout) => {
400                if let Some(pid) = child_id {
401                    kill_process_group(pid);
402                }
403                let _ = child.kill().await;
404                return Err("execution timed out".into());
405            }
406        }
407    } else {
408        child.wait().await.map_err(|e| e.to_string())?
409    };
410    let stdout = stdout_handle.await.unwrap_or_default();
411    let stderr = stderr_handle.await.unwrap_or_default();
412    if let Some(pid) = child_id {
413        kill_process_group(pid);
414    }
415    Ok(ExecOutput {
416        status: status.code().unwrap_or(-1),
417        stdout,
418        stderr,
419    })
420}
421
422#[cfg(unix)]
423fn kill_process_group(pid: u32) {
424    let pgid = -(pid as libc::pid_t);
425    // The shell may have already exited; ESRCH is fine here. This is a best
426    // effort cleanup for shell-spawned descendants after normal completion.
427    unsafe {
428        libc::kill(pgid, libc::SIGKILL);
429    }
430}
431
432#[cfg(not(unix))]
433fn kill_process_group(_pid: u32) {}
434
435fn shell_word(value: &str, force_quote: bool) -> String {
436    if force_quote || value.is_empty() || value.chars().any(char::is_whitespace) {
437        shell_quote(value)
438    } else {
439        value.to_string()
440    }
441}
442
443fn shell_quote(value: &str) -> String {
444    let mut quoted = String::from("'");
445    for ch in value.chars() {
446        if ch == '\'' {
447            quoted.push_str("'\\''");
448        } else {
449            quoted.push(ch);
450        }
451    }
452    quoted.push('\'');
453    quoted
454}