1use crate::spec::{ExecStage, ExecToken, TextPart, ValueRef};
24use bytes::Bytes;
25use std::collections::BTreeMap;
26use std::io::Write;
27use std::process::Stdio;
28use tempfile::NamedTempFile;
29use tokio::io::AsyncWriteExt;
30use tokio::process::Command;
31
32#[derive(Clone, Debug, Default)]
35pub enum BodyValue {
36 #[default]
37 None,
38 Text(String),
39 Json(serde_json::Value),
40 Form(BTreeMap<String, String>),
41 Binary(Bytes),
42}
43
44#[derive(Clone, Debug, Default)]
45pub struct ExecContext {
46 pub query: BTreeMap<String, String>,
47 pub path: BTreeMap<String, String>,
48 pub headers: BTreeMap<String, String>,
49 pub vars: BTreeMap<String, String>,
50 pub body: BodyValue,
51}
52
53impl ExecContext {
54 fn resolve_text(&self, r: &ValueRef) -> Option<String> {
55 match r {
56 ValueRef::Query(n) => self.query.get(n).cloned(),
57 ValueRef::Path(n) => self.path.get(n).cloned(),
58 ValueRef::Header(n) => self.headers.get(n).cloned(),
59 ValueRef::Var(n) => self.vars.get(n).cloned(),
60 ValueRef::Body { path } => match &self.body {
61 BodyValue::None => None,
62 BodyValue::Text(s) if path.is_empty() => Some(s.clone()),
63 BodyValue::Text(_) => None,
64 BodyValue::Json(v) => {
65 let mut cur = v;
66 for p in path {
67 cur = cur.get(p)?;
68 }
69 Some(json_to_text(cur))
70 }
71 BodyValue::Form(m) => {
72 if path.is_empty() {
73 Some(form_to_text(m))
74 } else if path.len() == 1 {
75 m.get(&path[0]).cloned()
76 } else {
77 None
78 }
79 }
80 BodyValue::Binary(_) => None,
81 },
82 }
83 }
84
85 fn resolve_bytes(&self, r: &ValueRef) -> Option<Vec<u8>> {
86 if let ValueRef::Body { path } = r
87 && path.is_empty()
88 && let BodyValue::Binary(b) = &self.body
89 {
90 return Some(b.to_vec());
91 }
92 self.resolve_text(r).map(|s| s.into_bytes())
93 }
94}
95
96fn json_to_text(v: &serde_json::Value) -> String {
97 match v {
98 serde_json::Value::String(s) => s.clone(),
99 other => other.to_string(),
100 }
101}
102
103fn form_to_text(m: &BTreeMap<String, String>) -> String {
104 let pairs: Vec<String> = m.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
105 pairs.join("&")
106}
107
108fn render_text_parts(parts: &[TextPart], ctx: &ExecContext) -> (String, bool) {
111 let mut out = String::new();
112 let mut all_present = true;
113 for p in parts {
114 match p {
115 TextPart::Literal(s) => out.push_str(s),
116 TextPart::Interp(r) => match ctx.resolve_text(r) {
117 Some(s) => out.push_str(&s),
118 None => all_present = false,
119 },
120 }
121 }
122 (out, all_present)
123}
124
125pub fn build_argv(tokens: &[ExecToken], ctx: &ExecContext) -> Vec<String> {
126 tracing::debug!(tokens = tokens.len(), "exec::build_argv");
127 let mut argv = Vec::new();
128 for t in tokens {
129 match t {
130 ExecToken::Text { parts, .. } => {
131 let (s, _) = render_text_parts(parts, ctx);
132 argv.push(s);
133 }
134 ExecToken::Group { pieces, .. } => {
135 let mut piece_strs = Vec::with_capacity(pieces.len());
136 let mut all_present = true;
137 for piece in pieces {
138 let (s, present) = render_text_parts(&piece.parts, ctx);
139 if !present {
140 all_present = false;
141 break;
142 }
143 piece_strs.push(s);
144 }
145 if all_present {
146 argv.extend(piece_strs);
147 }
148 }
149 }
150 }
151 argv
152}
153
154#[derive(Debug)]
157pub struct ExecOutput {
158 pub status: i32,
159 pub stdout: Vec<u8>,
160 pub stderr: Vec<u8>,
161}
162
163pub fn preview_pipeline(pipeline: &[ExecStage], ctx: &ExecContext) -> Vec<String> {
166 tracing::debug!(stages = pipeline.len(), "exec::preview_pipeline");
167 match render_shell_with_mode(pipeline, ctx, false) {
168 Ok(rendered) => vec![format!("shell: {}", rendered.script)],
169 Err(err) => vec![format!("shell: <unresolved: {}>", err)],
170 }
171}
172
173pub async fn run_pipeline(
174 pipeline: &[ExecStage],
175 ctx: &ExecContext,
176 timeout: Option<std::time::Duration>,
177) -> Result<ExecOutput, String> {
178 tracing::debug!(stages = pipeline.len(), ?timeout, "exec::run_pipeline");
179 if pipeline.is_empty() {
180 return Err("empty exec pipeline".into());
181 }
182 let rendered = render_shell(pipeline, ctx)?;
183 run_shell(rendered, timeout).await
184}
185
186struct RenderedShell {
187 script: String,
188 stdin: Option<Vec<u8>>,
189 _temp_files: Vec<NamedTempFile>,
190}
191
192struct ShellRenderer<'a> {
193 ctx: &'a ExecContext,
194 temp_files: Vec<NamedTempFile>,
195 materialize_binary: bool,
196}
197
198impl<'a> ShellRenderer<'a> {
199 fn new(ctx: &'a ExecContext, materialize_binary: bool) -> Self {
200 Self {
201 ctx,
202 temp_files: Vec::new(),
203 materialize_binary,
204 }
205 }
206
207 fn resolve_shell_text(&mut self, r: &ValueRef) -> Result<Option<String>, String> {
208 if let ValueRef::Body { path } = r
209 && path.is_empty()
210 && let BodyValue::Binary(bytes) = &self.ctx.body
211 {
212 if !self.materialize_binary {
213 return Ok(Some("<binary temp file>".into()));
214 }
215 let mut file = NamedTempFile::new().map_err(|e| e.to_string())?;
216 file.write_all(bytes).map_err(|e| e.to_string())?;
217 let path = file.path().to_string_lossy().to_string();
218 self.temp_files.push(file);
219 return Ok(Some(path));
220 }
221 Ok(self.ctx.resolve_text(r))
222 }
223
224 fn render_command(&mut self, tokens: &[ExecToken]) -> Result<String, String> {
225 let mut words = Vec::new();
226 for token in tokens {
227 match token {
228 ExecToken::Text {
229 parts, force_quote, ..
230 } => {
231 words.push(self.render_text_word(parts, *force_quote, false)?);
232 }
233 ExecToken::Group { pieces, .. } => {
234 let mut group_words = Vec::with_capacity(pieces.len());
235 let mut all_present = true;
236 for piece in pieces {
237 match self.render_optional_word(&piece.parts, piece.force_quote)? {
238 Some(word) => group_words.push(word),
239 None => {
240 all_present = false;
241 break;
242 }
243 }
244 }
245 if all_present {
246 words.extend(group_words);
247 }
248 }
249 }
250 }
251 if words.is_empty() {
252 return Err("command stage produced empty shell command".into());
253 }
254 Ok(words.join(" "))
255 }
256
257 fn render_text_word(
258 &mut self,
259 parts: &[TextPart],
260 force_quote: bool,
261 omit_missing: bool,
262 ) -> Result<String, String> {
263 let mut out = String::new();
264 let mut has_interp = false;
265 for part in parts {
266 match part {
267 TextPart::Literal(s) => out.push_str(s),
268 TextPart::Interp(r) => {
269 has_interp = true;
270 match self.resolve_shell_text(r)? {
271 Some(value) => out.push_str(&value),
272 None if omit_missing => return Ok(String::new()),
273 None => {}
274 }
275 }
276 }
277 }
278 Ok(shell_word(&out, force_quote || has_interp))
279 }
280
281 fn render_optional_word(
282 &mut self,
283 parts: &[TextPart],
284 force_quote: bool,
285 ) -> Result<Option<String>, String> {
286 let mut out = String::new();
287 let mut has_interp = false;
288 for part in parts {
289 match part {
290 TextPart::Literal(s) => out.push_str(s),
291 TextPart::Interp(r) => {
292 has_interp = true;
293 let Some(value) = self.resolve_shell_text(r)? else {
294 return Ok(None);
295 };
296 out.push_str(&value);
297 }
298 }
299 }
300 Ok(Some(shell_word(&out, force_quote || has_interp)))
301 }
302}
303
304fn render_shell(pipeline: &[ExecStage], ctx: &ExecContext) -> Result<RenderedShell, String> {
305 render_shell_with_mode(pipeline, ctx, true)
306}
307
308fn render_shell_with_mode(
309 pipeline: &[ExecStage],
310 ctx: &ExecContext,
311 materialize_binary: bool,
312) -> Result<RenderedShell, String> {
313 let mut pending_stdin: Option<Vec<u8>> = None;
314 let mut commands = Vec::new();
315 let mut saw_command = false;
316 let mut renderer = ShellRenderer::new(ctx, materialize_binary);
317
318 for stage in pipeline {
319 match stage {
320 ExecStage::Source { reference, .. } => {
321 if saw_command {
322 return Err(
323 "value-reference source after a command stage is not supported".into(),
324 );
325 }
326 let bytes = ctx
327 .resolve_bytes(reference)
328 .ok_or_else(|| format!("unresolved {}", reference.describe()))?;
329 pending_stdin = Some(bytes);
330 }
331 ExecStage::Command { tokens, .. } => {
332 saw_command = true;
333 commands.push(renderer.render_command(tokens)?);
334 }
335 }
336 }
337 if commands.is_empty() {
338 return Err("pipeline ended without a command".to_string());
339 }
340 Ok(RenderedShell {
341 script: commands.join(" | "),
342 stdin: pending_stdin,
343 _temp_files: renderer.temp_files,
344 })
345}
346
347async fn run_shell(
348 rendered: RenderedShell,
349 timeout: Option<std::time::Duration>,
350) -> Result<ExecOutput, String> {
351 tracing::debug!(script = %rendered.script, "exec::run_shell: spawning");
352 let mut cmd = Command::new("/bin/sh");
353 cmd.arg("-c").arg(&rendered.script);
354 cmd.kill_on_drop(true);
355 cmd.process_group(0);
356 cmd.stdout(Stdio::piped());
357 cmd.stderr(Stdio::piped());
358 if rendered.stdin.is_some() {
359 cmd.stdin(Stdio::piped());
360 } else {
361 cmd.stdin(Stdio::null());
362 }
363
364 let mut child = cmd
365 .spawn()
366 .map_err(|e| format!("failed to spawn shell: {}", e))?;
367 let child_id = child.id();
368 if let Some(stdin) = rendered.stdin
369 && let Some(mut sin) = child.stdin.take()
370 {
371 sin.write_all(&stdin).await.map_err(|e| e.to_string())?;
372 drop(sin);
373 }
374
375 let stdout = child.stdout.take();
376 let stderr = child.stderr.take();
377 let stdout_handle = tokio::spawn(async move {
378 let mut buf = Vec::new();
379 if let Some(mut s) = stdout {
380 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
381 .await
382 .ok();
383 }
384 buf
385 });
386 let stderr_handle = tokio::spawn(async move {
387 let mut buf = Vec::new();
388 if let Some(mut s) = stderr {
389 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
390 .await
391 .ok();
392 }
393 buf
394 });
395
396 let status = if let Some(timeout) = timeout {
397 tokio::select! {
398 status = child.wait() => status.map_err(|e| e.to_string())?,
399 _ = tokio::time::sleep(timeout) => {
400 if let Some(pid) = child_id {
401 kill_process_group(pid);
402 }
403 let _ = child.kill().await;
404 return Err("execution timed out".into());
405 }
406 }
407 } else {
408 child.wait().await.map_err(|e| e.to_string())?
409 };
410 let stdout = stdout_handle.await.unwrap_or_default();
411 let stderr = stderr_handle.await.unwrap_or_default();
412 if let Some(pid) = child_id {
413 kill_process_group(pid);
414 }
415 Ok(ExecOutput {
416 status: status.code().unwrap_or(-1),
417 stdout,
418 stderr,
419 })
420}
421
422#[cfg(unix)]
423fn kill_process_group(pid: u32) {
424 let pgid = -(pid as libc::pid_t);
425 unsafe {
428 libc::kill(pgid, libc::SIGKILL);
429 }
430}
431
432#[cfg(not(unix))]
433fn kill_process_group(_pid: u32) {}
434
435fn shell_word(value: &str, force_quote: bool) -> String {
436 if force_quote || value.is_empty() || value.chars().any(char::is_whitespace) {
437 shell_quote(value)
438 } else {
439 value.to_string()
440 }
441}
442
443fn shell_quote(value: &str) -> String {
444 let mut quoted = String::from("'");
445 for ch in value.chars() {
446 if ch == '\'' {
447 quoted.push_str("'\\''");
448 } else {
449 quoted.push(ch);
450 }
451 }
452 quoted.push('\'');
453 quoted
454}