Skip to main content

mii_http/
exec.rs

1//! Exec runtime: render the parsed Exec mini-language into shell text and run
2//! it through `/bin/sh`.
3//!
4//! The pipeline AST itself is produced by [`crate::parse::exec`]; this module
5//! is concerned with turning typed request values into shell-safe words.
6//!
7//! Semantics:
8//!
9//! - A [`ValueRef`] is resolved against an [`ExecContext`] (query/path/header/
10//!   var maps and a [`BodyValue`]).
11//! - A `Text` token is always emitted as one argv element. Missing
12//!   interpolations render as the empty string.
13//! - A `[..]` `Group` token is used for shell words that contain interpolation.
14//!   It is emitted only if every interpolation resolves; otherwise the whole
15//!   group is omitted.
16//! - Request values interpolated into shell text are single-quoted. Literal
17//!   shell syntax written by the spec author remains literal shell syntax.
18//! - A binary body or binary form field used outside stdin is written to a
19//!   temp file and the path is interpolated as a quoted shell word.
20//! - Pipeline stages are wired stdin → stdout. A bare `Source` stage
21//!   (`$ | cmd`) feeds a value as stdin to the next command. Multiple
22//!   statements (multi-line Exec) are joined into a single shell script
23//!   separated by newlines.
24
25use crate::spec::{ExecStage, ExecToken, TextPart, ValueRef};
26use bytes::Bytes;
27use std::collections::BTreeMap;
28use std::io::Write;
29use std::process::Stdio;
30use tempfile::NamedTempFile;
31use tokio::io::AsyncWriteExt;
32use tokio::process::Command;
33
34// ---------- Context ----------
35
36#[derive(Clone, Debug)]
37pub enum FormFieldValue {
38    Text(String),
39    Binary(Bytes),
40}
41
42impl FormFieldValue {
43    pub fn as_text(&self) -> Option<&str> {
44        if let FormFieldValue::Text(s) = self {
45            Some(s)
46        } else {
47            None
48        }
49    }
50    pub fn as_bytes(&self) -> &[u8] {
51        match self {
52            FormFieldValue::Text(s) => s.as_bytes(),
53            FormFieldValue::Binary(b) => b.as_ref(),
54        }
55    }
56}
57
58#[derive(Clone, Debug, Default)]
59pub enum BodyValue {
60    #[default]
61    None,
62    Text(String),
63    Json(serde_json::Value),
64    Form(BTreeMap<String, FormFieldValue>),
65    Binary(Bytes),
66}
67
68#[derive(Clone, Debug, Default)]
69pub struct ExecContext {
70    pub query: BTreeMap<String, String>,
71    pub path: BTreeMap<String, String>,
72    pub headers: BTreeMap<String, String>,
73    pub vars: BTreeMap<String, String>,
74    pub body: BodyValue,
75}
76
77impl ExecContext {
78    fn resolve_text(&self, r: &ValueRef) -> Option<String> {
79        match r {
80            ValueRef::Query(n) => self.query.get(n).cloned(),
81            ValueRef::Path(n) => self.path.get(n).cloned(),
82            ValueRef::Header(n) => self.headers.get(n).cloned(),
83            ValueRef::Var(n) => self.vars.get(n).cloned(),
84            ValueRef::Body { path } => match &self.body {
85                BodyValue::None => None,
86                BodyValue::Text(s) if path.is_empty() => Some(s.clone()),
87                BodyValue::Text(_) => None,
88                BodyValue::Json(v) => {
89                    let mut cur = v;
90                    for p in path {
91                        cur = cur.get(p)?;
92                    }
93                    Some(json_to_text(cur))
94                }
95                BodyValue::Form(m) => {
96                    if path.is_empty() {
97                        Some(form_to_text(m))
98                    } else if path.len() == 1 {
99                        m.get(&path[0]).and_then(|v| v.as_text().map(str::to_string))
100                    } else {
101                        None
102                    }
103                }
104                BodyValue::Binary(_) => None,
105            },
106        }
107    }
108
109    fn resolve_bytes(&self, r: &ValueRef) -> Option<Vec<u8>> {
110        if let ValueRef::Body { path } = r {
111            if path.is_empty() {
112                if let BodyValue::Binary(b) = &self.body {
113                    return Some(b.to_vec());
114                }
115            } else if path.len() == 1
116                && let BodyValue::Form(m) = &self.body
117                && let Some(field) = m.get(&path[0])
118            {
119                return Some(field.as_bytes().to_vec());
120            }
121        }
122        self.resolve_text(r).map(|s| s.into_bytes())
123    }
124}
125
126fn json_to_text(v: &serde_json::Value) -> String {
127    match v {
128        serde_json::Value::String(s) => s.clone(),
129        other => other.to_string(),
130    }
131}
132
133fn form_to_text(m: &BTreeMap<String, FormFieldValue>) -> String {
134    let pairs: Vec<String> = m
135        .iter()
136        .filter_map(|(k, v)| match v {
137            FormFieldValue::Text(s) => Some(format!("{}={}", k, s)),
138            FormFieldValue::Binary(_) => None,
139        })
140        .collect();
141    pairs.join("&")
142}
143
144// ---------- argv assembly ----------
145
146fn render_text_parts(parts: &[TextPart], ctx: &ExecContext) -> (String, bool) {
147    let mut out = String::new();
148    let mut all_present = true;
149    for p in parts {
150        match p {
151            TextPart::Literal(s) => out.push_str(s),
152            TextPart::Interp(r) => match ctx.resolve_text(r) {
153                Some(s) => out.push_str(&s),
154                None => all_present = false,
155            },
156        }
157    }
158    (out, all_present)
159}
160
161pub fn build_argv(tokens: &[ExecToken], ctx: &ExecContext) -> Vec<String> {
162    tracing::debug!(tokens = tokens.len(), "exec::build_argv");
163    let mut argv = Vec::new();
164    for t in tokens {
165        match t {
166            ExecToken::Text { parts, .. } => {
167                let (s, _) = render_text_parts(parts, ctx);
168                argv.push(s);
169            }
170            ExecToken::Group { pieces, .. } => {
171                let mut piece_strs = Vec::with_capacity(pieces.len());
172                let mut all_present = true;
173                for piece in pieces {
174                    let (s, present) = render_text_parts(&piece.parts, ctx);
175                    if !present {
176                        all_present = false;
177                        break;
178                    }
179                    piece_strs.push(s);
180                }
181                if all_present {
182                    argv.extend(piece_strs);
183                }
184            }
185        }
186    }
187    argv
188}
189
190// ---------- pipeline execution ----------
191
192#[derive(Debug)]
193pub struct ExecOutput {
194    pub status: i32,
195    pub stdout: Vec<u8>,
196    pub stderr: Vec<u8>,
197}
198
199/// Render a human-readable, non-executing preview of `statements` against
200/// `ctx`. Used by `--dry-run` to log the commands that would have run.
201pub fn preview_pipeline(statements: &[Vec<ExecStage>], ctx: &ExecContext) -> Vec<String> {
202    tracing::debug!(statements = statements.len(), "exec::preview_pipeline");
203    match render_shell_with_mode(statements, ctx, false) {
204        Ok(rendered) => vec![format!("shell: {}", rendered.script)],
205        Err(err) => vec![format!("shell: <unresolved: {}>", err)],
206    }
207}
208
209pub async fn run_pipeline(
210    statements: &[Vec<ExecStage>],
211    ctx: &ExecContext,
212    timeout: Option<std::time::Duration>,
213) -> Result<ExecOutput, String> {
214    tracing::debug!(statements = statements.len(), ?timeout, "exec::run_pipeline");
215    if statements.is_empty() {
216        return Err("empty exec pipeline".into());
217    }
218    let rendered = render_shell(statements, ctx)?;
219    run_shell(rendered, timeout).await
220}
221
222struct RenderedShell {
223    script: String,
224    stdin: Option<Vec<u8>>,
225    _temp_files: Vec<NamedTempFile>,
226}
227
228struct ShellRenderer<'a> {
229    ctx: &'a ExecContext,
230    temp_files: Vec<NamedTempFile>,
231    materialize_binary: bool,
232}
233
234impl<'a> ShellRenderer<'a> {
235    fn new(ctx: &'a ExecContext, materialize_binary: bool) -> Self {
236        Self {
237            ctx,
238            temp_files: Vec::new(),
239            materialize_binary,
240        }
241    }
242
243    fn resolve_shell_text(&mut self, r: &ValueRef) -> Result<Option<String>, String> {
244        if let ValueRef::Body { path } = r {
245            if path.is_empty()
246                && let BodyValue::Binary(bytes) = &self.ctx.body
247            {
248                if !self.materialize_binary {
249                    return Ok(Some("<binary temp file>".into()));
250                }
251                return Ok(Some(self.materialize_to_temp(bytes)?));
252            }
253            if path.len() == 1
254                && let BodyValue::Form(m) = &self.ctx.body
255                && let Some(FormFieldValue::Binary(bytes)) = m.get(&path[0])
256            {
257                if !self.materialize_binary {
258                    return Ok(Some("<binary temp file>".into()));
259                }
260                return Ok(Some(self.materialize_to_temp(bytes)?));
261            }
262        }
263        Ok(self.ctx.resolve_text(r))
264    }
265
266    fn materialize_to_temp(&mut self, bytes: &[u8]) -> Result<String, String> {
267        let mut file = NamedTempFile::new().map_err(|e| e.to_string())?;
268        file.write_all(bytes).map_err(|e| e.to_string())?;
269        let path = file.path().to_string_lossy().to_string();
270        self.temp_files.push(file);
271        Ok(path)
272    }
273
274    fn render_command(&mut self, tokens: &[ExecToken]) -> Result<String, String> {
275        let mut words = Vec::new();
276        for token in tokens {
277            match token {
278                ExecToken::Text {
279                    parts, force_quote, ..
280                } => {
281                    words.push(self.render_text_word(parts, *force_quote, false)?);
282                }
283                ExecToken::Group { pieces, .. } => {
284                    let mut group_words = Vec::with_capacity(pieces.len());
285                    let mut all_present = true;
286                    for piece in pieces {
287                        match self.render_optional_word(&piece.parts, piece.force_quote)? {
288                            Some(word) => group_words.push(word),
289                            None => {
290                                all_present = false;
291                                break;
292                            }
293                        }
294                    }
295                    if all_present {
296                        words.extend(group_words);
297                    }
298                }
299            }
300        }
301        if words.is_empty() {
302            return Err("command stage produced empty shell command".into());
303        }
304        Ok(words.join(" "))
305    }
306
307    fn render_text_word(
308        &mut self,
309        parts: &[TextPart],
310        force_quote: bool,
311        omit_missing: bool,
312    ) -> Result<String, String> {
313        let mut out = String::new();
314        let mut has_interp = false;
315        for part in parts {
316            match part {
317                TextPart::Literal(s) => out.push_str(s),
318                TextPart::Interp(r) => {
319                    has_interp = true;
320                    match self.resolve_shell_text(r)? {
321                        Some(value) => out.push_str(&value),
322                        None if omit_missing => return Ok(String::new()),
323                        None => {}
324                    }
325                }
326            }
327        }
328        Ok(shell_word(&out, force_quote || has_interp))
329    }
330
331    fn render_optional_word(
332        &mut self,
333        parts: &[TextPart],
334        force_quote: bool,
335    ) -> Result<Option<String>, String> {
336        let mut out = String::new();
337        let mut has_interp = false;
338        for part in parts {
339            match part {
340                TextPart::Literal(s) => out.push_str(s),
341                TextPart::Interp(r) => {
342                    has_interp = true;
343                    let Some(value) = self.resolve_shell_text(r)? else {
344                        return Ok(None);
345                    };
346                    out.push_str(&value);
347                }
348            }
349        }
350        Ok(Some(shell_word(&out, force_quote || has_interp)))
351    }
352}
353
354fn render_shell(statements: &[Vec<ExecStage>], ctx: &ExecContext) -> Result<RenderedShell, String> {
355    render_shell_with_mode(statements, ctx, true)
356}
357
358fn render_shell_with_mode(
359    statements: &[Vec<ExecStage>],
360    ctx: &ExecContext,
361    materialize_binary: bool,
362) -> Result<RenderedShell, String> {
363    let mut renderer = ShellRenderer::new(ctx, materialize_binary);
364    let mut script_lines = Vec::new();
365    let mut script_stdin: Option<Vec<u8>> = None;
366    for (idx, pipeline) in statements.iter().enumerate() {
367        let mut pending_stdin: Option<Vec<u8>> = None;
368        let mut commands = Vec::new();
369        let mut saw_command = false;
370        for stage in pipeline {
371            match stage {
372                ExecStage::Source { reference, .. } => {
373                    if saw_command {
374                        return Err(
375                            "value-reference source after a command stage is not supported".into(),
376                        );
377                    }
378                    let bytes = ctx
379                        .resolve_bytes(reference)
380                        .ok_or_else(|| format!("unresolved {}", reference.describe()))?;
381                    pending_stdin = Some(bytes);
382                }
383                ExecStage::Command { tokens, .. } => {
384                    saw_command = true;
385                    commands.push(renderer.render_command(tokens)?);
386                }
387            }
388        }
389        if commands.is_empty() {
390            return Err("pipeline ended without a command".to_string());
391        }
392        if pending_stdin.is_some() {
393            // Only the first statement may consume request stdin; subsequent
394            // statements would compete for the same stdin pipe.
395            if idx != 0 {
396                return Err(
397                    "only the first statement of a multi-line Exec may consume request stdin"
398                        .into(),
399                );
400            }
401            script_stdin = pending_stdin;
402        }
403        script_lines.push(commands.join(" | "));
404    }
405    if script_lines.is_empty() {
406        return Err("pipeline ended without a command".to_string());
407    }
408    Ok(RenderedShell {
409        script: script_lines.join("\n"),
410        stdin: script_stdin,
411        _temp_files: renderer.temp_files,
412    })
413}
414
415async fn run_shell(
416    rendered: RenderedShell,
417    timeout: Option<std::time::Duration>,
418) -> Result<ExecOutput, String> {
419    tracing::debug!(script = %rendered.script, "exec::run_shell: spawning");
420    let mut cmd = Command::new("/bin/sh");
421    cmd.arg("-c").arg(&rendered.script);
422    cmd.kill_on_drop(true);
423    cmd.process_group(0);
424    cmd.stdout(Stdio::piped());
425    cmd.stderr(Stdio::piped());
426    if rendered.stdin.is_some() {
427        cmd.stdin(Stdio::piped());
428    } else {
429        cmd.stdin(Stdio::null());
430    }
431
432    let mut child = cmd
433        .spawn()
434        .map_err(|e| format!("failed to spawn shell: {}", e))?;
435    let child_id = child.id();
436    if let Some(stdin) = rendered.stdin
437        && let Some(mut sin) = child.stdin.take()
438    {
439        sin.write_all(&stdin).await.map_err(|e| e.to_string())?;
440        drop(sin);
441    }
442
443    let stdout = child.stdout.take();
444    let stderr = child.stderr.take();
445    let stdout_handle = tokio::spawn(async move {
446        let mut buf = Vec::new();
447        if let Some(mut s) = stdout {
448            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
449                .await
450                .ok();
451        }
452        buf
453    });
454    let stderr_handle = tokio::spawn(async move {
455        let mut buf = Vec::new();
456        if let Some(mut s) = stderr {
457            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
458                .await
459                .ok();
460        }
461        buf
462    });
463
464    let status = if let Some(timeout) = timeout {
465        tokio::select! {
466            status = child.wait() => status.map_err(|e| e.to_string())?,
467            _ = tokio::time::sleep(timeout) => {
468                if let Some(pid) = child_id {
469                    kill_process_group(pid);
470                }
471                let _ = child.kill().await;
472                return Err("execution timed out".into());
473            }
474        }
475    } else {
476        child.wait().await.map_err(|e| e.to_string())?
477    };
478    let stdout = stdout_handle.await.unwrap_or_default();
479    let stderr = stderr_handle.await.unwrap_or_default();
480    if let Some(pid) = child_id {
481        kill_process_group(pid);
482    }
483    Ok(ExecOutput {
484        status: status.code().unwrap_or(-1),
485        stdout,
486        stderr,
487    })
488}
489
490/// A handle to a streaming exec: the receiver yields stdout chunks as they
491/// are produced; the join handle resolves with the final exit status (and any\n/// captured stderr) once the process completes.
492pub struct StreamingExec {
493    pub stdout_rx: tokio::sync::mpsc::Receiver<Result<Bytes, String>>,
494    pub completion: tokio::task::JoinHandle<Result<ExecCompletion, String>>,
495}
496
497#[derive(Debug)]
498pub struct ExecCompletion {
499    pub status: i32,
500    pub stderr: Vec<u8>,
501}
502
503pub async fn run_pipeline_streaming(
504    statements: &[Vec<ExecStage>],
505    ctx: &ExecContext,
506    timeout: Option<std::time::Duration>,
507) -> Result<StreamingExec, String> {
508    tracing::debug!(statements = statements.len(), ?timeout, "exec::run_pipeline_streaming");
509    if statements.is_empty() {
510        return Err("empty exec pipeline".into());
511    }
512    let rendered = render_shell(statements, ctx)?;
513    spawn_streaming(rendered, timeout).await
514}
515
516async fn spawn_streaming(
517    rendered: RenderedShell,
518    timeout: Option<std::time::Duration>,
519) -> Result<StreamingExec, String> {
520    let mut cmd = Command::new("/bin/sh");
521    cmd.arg("-c").arg(&rendered.script);
522    cmd.kill_on_drop(true);
523    cmd.process_group(0);
524    cmd.stdout(Stdio::piped());
525    cmd.stderr(Stdio::piped());
526    if rendered.stdin.is_some() {
527        cmd.stdin(Stdio::piped());
528    } else {
529        cmd.stdin(Stdio::null());
530    }
531
532    let mut child = cmd
533        .spawn()
534        .map_err(|e| format!("failed to spawn shell: {}", e))?;
535    let child_id = child.id();
536    if let Some(stdin) = rendered.stdin.clone()
537        && let Some(mut sin) = child.stdin.take()
538    {
539        // Push stdin in the background; large bodies must not block the
540        // caller before the response head can be sent.
541        tokio::spawn(async move {
542            let _ = sin.write_all(&stdin).await;
543        });
544    }
545
546    let temp_files = rendered._temp_files;
547    let mut stdout = child.stdout.take();
548    let stderr = child.stderr.take();
549    let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, String>>(8);
550
551    let stderr_handle = tokio::spawn(async move {
552        let mut buf = Vec::new();
553        if let Some(mut s) = stderr {
554            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
555                .await
556                .ok();
557        }
558        buf
559    });
560
561    let stdout_tx = tx.clone();
562    let stdout_pump = tokio::spawn(async move {
563        use tokio::io::AsyncReadExt;
564        if let Some(s) = stdout.as_mut() {
565            let mut buf = vec![0u8; 8 * 1024];
566            loop {
567                match s.read(&mut buf).await {
568                    Ok(0) => break,
569                    Ok(n) => {
570                        if stdout_tx
571                            .send(Ok(Bytes::copy_from_slice(&buf[..n])))
572                            .await
573                            .is_err()
574                        {
575                            break;
576                        }
577                    }
578                    Err(e) => {
579                        let _ = stdout_tx.send(Err(e.to_string())).await;
580                        break;
581                    }
582                }
583            }
584        }
585    });
586
587    let completion = tokio::spawn(async move {
588        let _temp_files = temp_files; // hold temp files alive until done
589        let status = if let Some(timeout) = timeout {
590            tokio::select! {
591                status = child.wait() => status.map_err(|e| e.to_string())?,
592                _ = tokio::time::sleep(timeout) => {
593                    if let Some(pid) = child_id {
594                        kill_process_group(pid);
595                    }
596                    let _ = child.kill().await;
597                    let _ = stdout_pump.await;
598                    return Err("execution timed out".into());
599                }
600            }
601        } else {
602            child.wait().await.map_err(|e| e.to_string())?
603        };
604        let _ = stdout_pump.await;
605        let stderr = stderr_handle.await.unwrap_or_default();
606        if let Some(pid) = child_id {
607            kill_process_group(pid);
608        }
609        Ok(ExecCompletion {
610            status: status.code().unwrap_or(-1),
611            stderr,
612        })
613    });
614
615    Ok(StreamingExec {
616        stdout_rx: rx,
617        completion,
618    })
619}
620
621#[cfg(unix)]
622fn kill_process_group(pid: u32) {
623    let pgid = -(pid as libc::pid_t);
624    // The shell may have already exited; ESRCH is fine here. This is a best
625    // effort cleanup for shell-spawned descendants after normal completion.
626    unsafe {
627        libc::kill(pgid, libc::SIGKILL);
628    }
629}
630
631#[cfg(not(unix))]
632fn kill_process_group(_pid: u32) {}
633
634fn shell_word(value: &str, force_quote: bool) -> String {
635    if force_quote || value.is_empty() || value.chars().any(char::is_whitespace) {
636        shell_quote(value)
637    } else {
638        value.to_string()
639    }
640}
641
642fn shell_quote(value: &str) -> String {
643    let mut quoted = String::from("'");
644    for ch in value.chars() {
645        if ch == '\'' {
646            quoted.push_str("'\\''");
647        } else {
648            quoted.push(ch);
649        }
650    }
651    quoted.push('\'');
652    quoted
653}