Skip to main content

mii_http/
exec.rs

1//! Exec runtime: build argv from a parsed pipeline and execute it as a chain
2//! of child processes. **No shell is ever invoked.**
3//!
4//! The pipeline AST itself is produced by [`crate::parse::exec`]; this module
5//! is concerned only with executing it safely.
6//!
7//! Semantics:
8//!
9//! - A [`ValueRef`] is resolved against an [`ExecContext`] (query/path/header/
10//!   var maps and a [`BodyValue`]).
11//! - A `Text` token is always emitted as one argv element. Missing
12//!   interpolations render as the empty string.
13//! - A `[..]` `Group` token is emitted only if every required interpolation
14//!   resolves; otherwise the whole group is omitted from argv (this is how
15//!   optional flags work).
16//! - Pipeline stages are wired stdin → stdout. A bare `Source` stage
17//!   (`$ | cmd`) feeds a value as stdin to the next command.
18
19use crate::spec::{ExecStage, ExecToken, TextPart, ValueRef};
20use bytes::Bytes;
21use std::collections::BTreeMap;
22use std::process::Stdio;
23use tokio::io::AsyncWriteExt;
24use tokio::process::Command;
25
26// ---------- Context ----------
27
28#[derive(Clone, Debug)]
29pub enum BodyValue {
30    None,
31    Text(String),
32    Json(serde_json::Value),
33    Form(BTreeMap<String, String>),
34    Binary(Bytes),
35}
36
37impl Default for BodyValue {
38    fn default() -> Self {
39        BodyValue::None
40    }
41}
42
43#[derive(Clone, Debug, Default)]
44pub struct ExecContext {
45    pub query: BTreeMap<String, String>,
46    pub path: BTreeMap<String, String>,
47    pub headers: BTreeMap<String, String>,
48    pub vars: BTreeMap<String, String>,
49    pub body: BodyValue,
50}
51
52impl ExecContext {
53    fn resolve_text(&self, r: &ValueRef) -> Option<String> {
54        match r {
55            ValueRef::Query(n) => self.query.get(n).cloned(),
56            ValueRef::Path(n) => self.path.get(n).cloned(),
57            ValueRef::Header(n) => self.headers.get(n).cloned(),
58            ValueRef::Var(n) => self.vars.get(n).cloned(),
59            ValueRef::Body { path } => match &self.body {
60                BodyValue::None => None,
61                BodyValue::Text(s) if path.is_empty() => Some(s.clone()),
62                BodyValue::Text(_) => None,
63                BodyValue::Json(v) => {
64                    let mut cur = v;
65                    for p in path {
66                        cur = cur.get(p)?;
67                    }
68                    Some(json_to_text(cur))
69                }
70                BodyValue::Form(m) => {
71                    if path.is_empty() {
72                        Some(form_to_text(m))
73                    } else if path.len() == 1 {
74                        m.get(&path[0]).cloned()
75                    } else {
76                        None
77                    }
78                }
79                BodyValue::Binary(_) => None,
80            },
81        }
82    }
83
84    fn resolve_bytes(&self, r: &ValueRef) -> Option<Vec<u8>> {
85        if let ValueRef::Body { path } = r {
86            if path.is_empty() {
87                if let BodyValue::Binary(b) = &self.body {
88                    return Some(b.to_vec());
89                }
90            }
91        }
92        self.resolve_text(r).map(|s| s.into_bytes())
93    }
94}
95
96fn json_to_text(v: &serde_json::Value) -> String {
97    match v {
98        serde_json::Value::String(s) => s.clone(),
99        other => other.to_string(),
100    }
101}
102
103fn form_to_text(m: &BTreeMap<String, String>) -> String {
104    let pairs: Vec<String> = m.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
105    pairs.join("&")
106}
107
108// ---------- argv assembly ----------
109
110fn render_text_parts(parts: &[TextPart], ctx: &ExecContext) -> (String, bool) {
111    let mut out = String::new();
112    let mut all_present = true;
113    for p in parts {
114        match p {
115            TextPart::Literal(s) => out.push_str(s),
116            TextPart::Interp(r) => match ctx.resolve_text(r) {
117                Some(s) => out.push_str(&s),
118                None => all_present = false,
119            },
120        }
121    }
122    (out, all_present)
123}
124
125pub fn build_argv(tokens: &[ExecToken], ctx: &ExecContext) -> Vec<String> {
126    tracing::debug!(tokens = tokens.len(), "exec::build_argv");
127    let mut argv = Vec::new();
128    for t in tokens {
129        match t {
130            ExecToken::Text { parts, .. } => {
131                let (s, _) = render_text_parts(parts, ctx);
132                argv.push(s);
133            }
134            ExecToken::Group { pieces, .. } => {
135                let mut piece_strs = Vec::with_capacity(pieces.len());
136                let mut all_present = true;
137                for piece in pieces {
138                    let (s, present) = render_text_parts(&piece.parts, ctx);
139                    if !present {
140                        all_present = false;
141                        break;
142                    }
143                    piece_strs.push(s);
144                }
145                if all_present {
146                    argv.extend(piece_strs);
147                }
148            }
149        }
150    }
151    argv
152}
153
154// ---------- pipeline execution ----------
155
156#[derive(Debug)]
157pub struct ExecOutput {
158    pub status: i32,
159    pub stdout: Vec<u8>,
160    pub stderr: Vec<u8>,
161}
162
163/// Render a human-readable, non-executing preview of `pipeline` against `ctx`.
164/// Used by `--dry-run` to log the commands that would have run.
165pub fn preview_pipeline(pipeline: &[ExecStage], ctx: &ExecContext) -> Vec<String> {
166    tracing::debug!(stages = pipeline.len(), "exec::preview_pipeline");
167    let mut out = Vec::with_capacity(pipeline.len());
168    for stage in pipeline {
169        match stage {
170            ExecStage::Source { reference, .. } => {
171                let resolved = ctx
172                    .resolve_text(reference)
173                    .map(|s| {
174                        if s.len() > 200 {
175                            format!("{}…", &s[..200])
176                        } else {
177                            s
178                        }
179                    })
180                    .unwrap_or_else(|| "<unresolved>".into());
181                out.push(format!("stdin <- {} = {:?}", reference.describe(), resolved));
182            }
183            ExecStage::Command { tokens, .. } => {
184                let argv = build_argv(tokens, ctx);
185                out.push(format!("argv: {:?}", argv));
186            }
187        }
188    }
189    out
190}
191
192pub async fn run_pipeline(
193    pipeline: &[ExecStage],
194    ctx: &ExecContext,
195    timeout: Option<std::time::Duration>,
196) -> Result<ExecOutput, String> {
197    tracing::debug!(stages = pipeline.len(), ?timeout, "exec::run_pipeline");
198    if pipeline.is_empty() {
199        return Err("empty exec pipeline".into());
200    }
201    let fut = run_pipeline_inner(pipeline, ctx);
202    if let Some(t) = timeout {
203        match tokio::time::timeout(t, fut).await {
204            Ok(r) => r,
205            Err(_) => Err("execution timed out".into()),
206        }
207    } else {
208        fut.await
209    }
210}
211
212async fn run_pipeline_inner(
213    pipeline: &[ExecStage],
214    ctx: &ExecContext,
215) -> Result<ExecOutput, String> {
216    let mut pending_stdin: Option<Vec<u8>> = None;
217    let mut prev_child: Option<tokio::process::Child> = None;
218
219    let mut i = 0;
220    while i < pipeline.len() {
221        match &pipeline[i] {
222            ExecStage::Source { reference, .. } => {
223                if prev_child.is_some() {
224                    return Err(
225                        "value-reference source after a command stage is not supported".into(),
226                    );
227                }
228                let bytes = ctx
229                    .resolve_bytes(reference)
230                    .ok_or_else(|| format!("unresolved {}", reference.describe()))?;
231                pending_stdin = Some(bytes);
232                i += 1;
233            }
234            ExecStage::Command { tokens, .. } => {
235                let argv = build_argv(tokens, ctx);
236                if argv.is_empty() {
237                    return Err("command stage produced empty argv".into());
238                }
239                let program = argv[0].clone();
240                let args = &argv[1..];
241                tracing::debug!(program = %program, args = ?args, "exec::run_pipeline: spawning");
242                let mut cmd = Command::new(&program);
243                cmd.args(args);
244                cmd.stdout(Stdio::piped());
245                cmd.stderr(Stdio::piped());
246
247                let stdin_data: Option<Vec<u8>>;
248                if let Some(mut prev) = prev_child.take() {
249                    if let Some(out) = prev.stdout.take() {
250                        let std_out: std::process::Stdio = out
251                            .try_into()
252                            .map_err(|e: std::io::Error| e.to_string())?;
253                        cmd.stdin(std_out);
254                    } else {
255                        cmd.stdin(Stdio::null());
256                    }
257                    tokio::spawn(async move {
258                        let _ = prev.wait().await;
259                    });
260                    stdin_data = None;
261                } else if let Some(d) = pending_stdin.take() {
262                    cmd.stdin(Stdio::piped());
263                    stdin_data = Some(d);
264                } else {
265                    cmd.stdin(Stdio::null());
266                    stdin_data = None;
267                }
268
269                let mut child = cmd
270                    .spawn()
271                    .map_err(|e| format!("failed to spawn `{}`: {}", program, e))?;
272                if let Some(d) = stdin_data {
273                    if let Some(mut sin) = child.stdin.take() {
274                        sin.write_all(&d).await.map_err(|e| e.to_string())?;
275                        drop(sin);
276                    }
277                }
278                prev_child = Some(child);
279                i += 1;
280            }
281        }
282    }
283
284    let mut child = prev_child.ok_or_else(|| "pipeline ended without a command".to_string())?;
285    let stdout = child.stdout.take();
286    let stderr = child.stderr.take();
287    let stdout_handle = tokio::spawn(async move {
288        let mut buf = Vec::new();
289        if let Some(mut s) = stdout {
290            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
291                .await
292                .ok();
293        }
294        buf
295    });
296    let stderr_handle = tokio::spawn(async move {
297        let mut buf = Vec::new();
298        if let Some(mut s) = stderr {
299            tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
300                .await
301                .ok();
302        }
303        buf
304    });
305    let status = child.wait().await.map_err(|e| e.to_string())?;
306    let stdout = stdout_handle.await.unwrap_or_default();
307    let stderr = stderr_handle.await.unwrap_or_default();
308    Ok(ExecOutput {
309        status: status.code().unwrap_or(-1),
310        stdout,
311        stderr,
312    })
313}