1use crate::spec::{ExecStage, ExecToken, TextPart, ValueRef};
26use bytes::Bytes;
27use std::collections::BTreeMap;
28use std::io::Write;
29use std::process::Stdio;
30use tempfile::NamedTempFile;
31use tokio::io::AsyncWriteExt;
32use tokio::process::Command;
33
34#[derive(Clone, Debug)]
37pub enum FormFieldValue {
38 Text(String),
39 Binary(Bytes),
40}
41
42impl FormFieldValue {
43 pub fn as_text(&self) -> Option<&str> {
44 if let FormFieldValue::Text(s) = self {
45 Some(s)
46 } else {
47 None
48 }
49 }
50 pub fn as_bytes(&self) -> &[u8] {
51 match self {
52 FormFieldValue::Text(s) => s.as_bytes(),
53 FormFieldValue::Binary(b) => b.as_ref(),
54 }
55 }
56}
57
58#[derive(Clone, Debug, Default)]
59pub enum BodyValue {
60 #[default]
61 None,
62 Text(String),
63 Json(serde_json::Value),
64 Form(BTreeMap<String, FormFieldValue>),
65 Binary(Bytes),
66}
67
68#[derive(Clone, Debug, Default)]
69pub struct ExecContext {
70 pub query: BTreeMap<String, String>,
71 pub path: BTreeMap<String, String>,
72 pub headers: BTreeMap<String, String>,
73 pub vars: BTreeMap<String, String>,
74 pub body: BodyValue,
75}
76
77impl ExecContext {
78 fn resolve_text(&self, r: &ValueRef) -> Option<String> {
79 match r {
80 ValueRef::Query(n) => self.query.get(n).cloned(),
81 ValueRef::Path(n) => self.path.get(n).cloned(),
82 ValueRef::Header(n) => self.headers.get(n).cloned(),
83 ValueRef::Var(n) => self.vars.get(n).cloned(),
84 ValueRef::Body { path } => match &self.body {
85 BodyValue::None => None,
86 BodyValue::Text(s) if path.is_empty() => Some(s.clone()),
87 BodyValue::Text(_) => None,
88 BodyValue::Json(v) => {
89 let mut cur = v;
90 for p in path {
91 cur = cur.get(p)?;
92 }
93 Some(json_to_text(cur))
94 }
95 BodyValue::Form(m) => {
96 if path.is_empty() {
97 Some(form_to_text(m))
98 } else if path.len() == 1 {
99 m.get(&path[0]).and_then(|v| v.as_text().map(str::to_string))
100 } else {
101 None
102 }
103 }
104 BodyValue::Binary(_) => None,
105 },
106 }
107 }
108
109 fn resolve_bytes(&self, r: &ValueRef) -> Option<Vec<u8>> {
110 if let ValueRef::Body { path } = r {
111 if path.is_empty() {
112 if let BodyValue::Binary(b) = &self.body {
113 return Some(b.to_vec());
114 }
115 } else if path.len() == 1
116 && let BodyValue::Form(m) = &self.body
117 && let Some(field) = m.get(&path[0])
118 {
119 return Some(field.as_bytes().to_vec());
120 }
121 }
122 self.resolve_text(r).map(|s| s.into_bytes())
123 }
124}
125
126fn json_to_text(v: &serde_json::Value) -> String {
127 match v {
128 serde_json::Value::String(s) => s.clone(),
129 other => other.to_string(),
130 }
131}
132
133fn form_to_text(m: &BTreeMap<String, FormFieldValue>) -> String {
134 let pairs: Vec<String> = m
135 .iter()
136 .filter_map(|(k, v)| match v {
137 FormFieldValue::Text(s) => Some(format!("{}={}", k, s)),
138 FormFieldValue::Binary(_) => None,
139 })
140 .collect();
141 pairs.join("&")
142}
143
144fn render_text_parts(parts: &[TextPart], ctx: &ExecContext) -> (String, bool) {
147 let mut out = String::new();
148 let mut all_present = true;
149 for p in parts {
150 match p {
151 TextPart::Literal(s) => out.push_str(s),
152 TextPart::Interp(r) => match ctx.resolve_text(r) {
153 Some(s) => out.push_str(&s),
154 None => all_present = false,
155 },
156 }
157 }
158 (out, all_present)
159}
160
161pub fn build_argv(tokens: &[ExecToken], ctx: &ExecContext) -> Vec<String> {
162 tracing::debug!(tokens = tokens.len(), "exec::build_argv");
163 let mut argv = Vec::new();
164 for t in tokens {
165 match t {
166 ExecToken::Text { parts, .. } => {
167 let (s, _) = render_text_parts(parts, ctx);
168 argv.push(s);
169 }
170 ExecToken::Group { pieces, .. } => {
171 let mut piece_strs = Vec::with_capacity(pieces.len());
172 let mut all_present = true;
173 for piece in pieces {
174 let (s, present) = render_text_parts(&piece.parts, ctx);
175 if !present {
176 all_present = false;
177 break;
178 }
179 piece_strs.push(s);
180 }
181 if all_present {
182 argv.extend(piece_strs);
183 }
184 }
185 }
186 }
187 argv
188}
189
190#[derive(Debug)]
193pub struct ExecOutput {
194 pub status: i32,
195 pub stdout: Vec<u8>,
196 pub stderr: Vec<u8>,
197}
198
199pub fn preview_pipeline(statements: &[Vec<ExecStage>], ctx: &ExecContext) -> Vec<String> {
202 tracing::debug!(statements = statements.len(), "exec::preview_pipeline");
203 match render_shell_with_mode(statements, ctx, false) {
204 Ok(rendered) => vec![format!("shell: {}", rendered.script)],
205 Err(err) => vec![format!("shell: <unresolved: {}>", err)],
206 }
207}
208
209pub async fn run_pipeline(
210 statements: &[Vec<ExecStage>],
211 ctx: &ExecContext,
212 timeout: Option<std::time::Duration>,
213) -> Result<ExecOutput, String> {
214 tracing::debug!(statements = statements.len(), ?timeout, "exec::run_pipeline");
215 if statements.is_empty() {
216 return Err("empty exec pipeline".into());
217 }
218 let rendered = render_shell(statements, ctx)?;
219 run_shell(rendered, timeout).await
220}
221
222struct RenderedShell {
223 script: String,
224 stdin: Option<Vec<u8>>,
225 _temp_files: Vec<NamedTempFile>,
226}
227
228struct ShellRenderer<'a> {
229 ctx: &'a ExecContext,
230 temp_files: Vec<NamedTempFile>,
231 materialize_binary: bool,
232}
233
234impl<'a> ShellRenderer<'a> {
235 fn new(ctx: &'a ExecContext, materialize_binary: bool) -> Self {
236 Self {
237 ctx,
238 temp_files: Vec::new(),
239 materialize_binary,
240 }
241 }
242
243 fn resolve_shell_text(&mut self, r: &ValueRef) -> Result<Option<String>, String> {
244 if let ValueRef::Body { path } = r {
245 if path.is_empty()
246 && let BodyValue::Binary(bytes) = &self.ctx.body
247 {
248 if !self.materialize_binary {
249 return Ok(Some("<binary temp file>".into()));
250 }
251 return Ok(Some(self.materialize_to_temp(bytes)?));
252 }
253 if path.len() == 1
254 && let BodyValue::Form(m) = &self.ctx.body
255 && let Some(FormFieldValue::Binary(bytes)) = m.get(&path[0])
256 {
257 if !self.materialize_binary {
258 return Ok(Some("<binary temp file>".into()));
259 }
260 return Ok(Some(self.materialize_to_temp(bytes)?));
261 }
262 }
263 Ok(self.ctx.resolve_text(r))
264 }
265
266 fn materialize_to_temp(&mut self, bytes: &[u8]) -> Result<String, String> {
267 let mut file = NamedTempFile::new().map_err(|e| e.to_string())?;
268 file.write_all(bytes).map_err(|e| e.to_string())?;
269 let path = file.path().to_string_lossy().to_string();
270 self.temp_files.push(file);
271 Ok(path)
272 }
273
274 fn render_command(&mut self, tokens: &[ExecToken]) -> Result<String, String> {
275 let mut words = Vec::new();
276 for token in tokens {
277 match token {
278 ExecToken::Text {
279 parts, force_quote, ..
280 } => {
281 words.push(self.render_text_word(parts, *force_quote, false)?);
282 }
283 ExecToken::Group { pieces, .. } => {
284 let mut group_words = Vec::with_capacity(pieces.len());
285 let mut all_present = true;
286 for piece in pieces {
287 match self.render_optional_word(&piece.parts, piece.force_quote)? {
288 Some(word) => group_words.push(word),
289 None => {
290 all_present = false;
291 break;
292 }
293 }
294 }
295 if all_present {
296 words.extend(group_words);
297 }
298 }
299 }
300 }
301 if words.is_empty() {
302 return Err("command stage produced empty shell command".into());
303 }
304 Ok(words.join(" "))
305 }
306
307 fn render_text_word(
308 &mut self,
309 parts: &[TextPart],
310 force_quote: bool,
311 omit_missing: bool,
312 ) -> Result<String, String> {
313 let mut out = String::new();
314 let mut has_interp = false;
315 for part in parts {
316 match part {
317 TextPart::Literal(s) => out.push_str(s),
318 TextPart::Interp(r) => {
319 has_interp = true;
320 match self.resolve_shell_text(r)? {
321 Some(value) => out.push_str(&value),
322 None if omit_missing => return Ok(String::new()),
323 None => {}
324 }
325 }
326 }
327 }
328 Ok(shell_word(&out, force_quote || has_interp))
329 }
330
331 fn render_optional_word(
332 &mut self,
333 parts: &[TextPart],
334 force_quote: bool,
335 ) -> Result<Option<String>, String> {
336 let mut out = String::new();
337 let mut has_interp = false;
338 for part in parts {
339 match part {
340 TextPart::Literal(s) => out.push_str(s),
341 TextPart::Interp(r) => {
342 has_interp = true;
343 let Some(value) = self.resolve_shell_text(r)? else {
344 return Ok(None);
345 };
346 out.push_str(&value);
347 }
348 }
349 }
350 Ok(Some(shell_word(&out, force_quote || has_interp)))
351 }
352}
353
354fn render_shell(statements: &[Vec<ExecStage>], ctx: &ExecContext) -> Result<RenderedShell, String> {
355 render_shell_with_mode(statements, ctx, true)
356}
357
358fn render_shell_with_mode(
359 statements: &[Vec<ExecStage>],
360 ctx: &ExecContext,
361 materialize_binary: bool,
362) -> Result<RenderedShell, String> {
363 let mut renderer = ShellRenderer::new(ctx, materialize_binary);
364 let mut script_lines = Vec::new();
365 let mut script_stdin: Option<Vec<u8>> = None;
366 for (idx, pipeline) in statements.iter().enumerate() {
367 let mut pending_stdin: Option<Vec<u8>> = None;
368 let mut commands = Vec::new();
369 let mut saw_command = false;
370 for stage in pipeline {
371 match stage {
372 ExecStage::Source { reference, .. } => {
373 if saw_command {
374 return Err(
375 "value-reference source after a command stage is not supported".into(),
376 );
377 }
378 let bytes = ctx
379 .resolve_bytes(reference)
380 .ok_or_else(|| format!("unresolved {}", reference.describe()))?;
381 pending_stdin = Some(bytes);
382 }
383 ExecStage::Command { tokens, .. } => {
384 saw_command = true;
385 commands.push(renderer.render_command(tokens)?);
386 }
387 }
388 }
389 if commands.is_empty() {
390 return Err("pipeline ended without a command".to_string());
391 }
392 if pending_stdin.is_some() {
393 if idx != 0 {
396 return Err(
397 "only the first statement of a multi-line Exec may consume request stdin"
398 .into(),
399 );
400 }
401 script_stdin = pending_stdin;
402 }
403 script_lines.push(commands.join(" | "));
404 }
405 if script_lines.is_empty() {
406 return Err("pipeline ended without a command".to_string());
407 }
408 Ok(RenderedShell {
409 script: script_lines.join("\n"),
410 stdin: script_stdin,
411 _temp_files: renderer.temp_files,
412 })
413}
414
415async fn run_shell(
416 rendered: RenderedShell,
417 timeout: Option<std::time::Duration>,
418) -> Result<ExecOutput, String> {
419 tracing::debug!(script = %rendered.script, "exec::run_shell: spawning");
420 let mut cmd = Command::new("/bin/sh");
421 cmd.arg("-c").arg(&rendered.script);
422 cmd.kill_on_drop(true);
423 cmd.process_group(0);
424 cmd.stdout(Stdio::piped());
425 cmd.stderr(Stdio::piped());
426 if rendered.stdin.is_some() {
427 cmd.stdin(Stdio::piped());
428 } else {
429 cmd.stdin(Stdio::null());
430 }
431
432 let mut child = cmd
433 .spawn()
434 .map_err(|e| format!("failed to spawn shell: {}", e))?;
435 let child_id = child.id();
436 if let Some(stdin) = rendered.stdin
437 && let Some(mut sin) = child.stdin.take()
438 {
439 sin.write_all(&stdin).await.map_err(|e| e.to_string())?;
440 drop(sin);
441 }
442
443 let stdout = child.stdout.take();
444 let stderr = child.stderr.take();
445 let stdout_handle = tokio::spawn(async move {
446 let mut buf = Vec::new();
447 if let Some(mut s) = stdout {
448 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
449 .await
450 .ok();
451 }
452 buf
453 });
454 let stderr_handle = tokio::spawn(async move {
455 let mut buf = Vec::new();
456 if let Some(mut s) = stderr {
457 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
458 .await
459 .ok();
460 }
461 buf
462 });
463
464 let status = if let Some(timeout) = timeout {
465 tokio::select! {
466 status = child.wait() => status.map_err(|e| e.to_string())?,
467 _ = tokio::time::sleep(timeout) => {
468 if let Some(pid) = child_id {
469 kill_process_group(pid);
470 }
471 let _ = child.kill().await;
472 return Err("execution timed out".into());
473 }
474 }
475 } else {
476 child.wait().await.map_err(|e| e.to_string())?
477 };
478 let stdout = stdout_handle.await.unwrap_or_default();
479 let stderr = stderr_handle.await.unwrap_or_default();
480 if let Some(pid) = child_id {
481 kill_process_group(pid);
482 }
483 Ok(ExecOutput {
484 status: status.code().unwrap_or(-1),
485 stdout,
486 stderr,
487 })
488}
489
490pub struct StreamingExec {
493 pub stdout_rx: tokio::sync::mpsc::Receiver<Result<Bytes, String>>,
494 pub completion: tokio::task::JoinHandle<Result<ExecCompletion, String>>,
495}
496
497#[derive(Debug)]
498pub struct ExecCompletion {
499 pub status: i32,
500 pub stderr: Vec<u8>,
501}
502
503pub async fn run_pipeline_streaming(
504 statements: &[Vec<ExecStage>],
505 ctx: &ExecContext,
506 timeout: Option<std::time::Duration>,
507) -> Result<StreamingExec, String> {
508 tracing::debug!(statements = statements.len(), ?timeout, "exec::run_pipeline_streaming");
509 if statements.is_empty() {
510 return Err("empty exec pipeline".into());
511 }
512 let rendered = render_shell(statements, ctx)?;
513 spawn_streaming(rendered, timeout).await
514}
515
516async fn spawn_streaming(
517 rendered: RenderedShell,
518 timeout: Option<std::time::Duration>,
519) -> Result<StreamingExec, String> {
520 let mut cmd = Command::new("/bin/sh");
521 cmd.arg("-c").arg(&rendered.script);
522 cmd.kill_on_drop(true);
523 cmd.process_group(0);
524 cmd.stdout(Stdio::piped());
525 cmd.stderr(Stdio::piped());
526 if rendered.stdin.is_some() {
527 cmd.stdin(Stdio::piped());
528 } else {
529 cmd.stdin(Stdio::null());
530 }
531
532 let mut child = cmd
533 .spawn()
534 .map_err(|e| format!("failed to spawn shell: {}", e))?;
535 let child_id = child.id();
536 if let Some(stdin) = rendered.stdin.clone()
537 && let Some(mut sin) = child.stdin.take()
538 {
539 tokio::spawn(async move {
542 let _ = sin.write_all(&stdin).await;
543 });
544 }
545
546 let temp_files = rendered._temp_files;
547 let mut stdout = child.stdout.take();
548 let stderr = child.stderr.take();
549 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, String>>(8);
550
551 let stderr_handle = tokio::spawn(async move {
552 let mut buf = Vec::new();
553 if let Some(mut s) = stderr {
554 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
555 .await
556 .ok();
557 }
558 buf
559 });
560
561 let stdout_tx = tx.clone();
562 let stdout_pump = tokio::spawn(async move {
563 use tokio::io::AsyncReadExt;
564 if let Some(s) = stdout.as_mut() {
565 let mut buf = vec![0u8; 8 * 1024];
566 loop {
567 match s.read(&mut buf).await {
568 Ok(0) => break,
569 Ok(n) => {
570 if stdout_tx
571 .send(Ok(Bytes::copy_from_slice(&buf[..n])))
572 .await
573 .is_err()
574 {
575 break;
576 }
577 }
578 Err(e) => {
579 let _ = stdout_tx.send(Err(e.to_string())).await;
580 break;
581 }
582 }
583 }
584 }
585 });
586
587 let completion = tokio::spawn(async move {
588 let _temp_files = temp_files; let status = if let Some(timeout) = timeout {
590 tokio::select! {
591 status = child.wait() => status.map_err(|e| e.to_string())?,
592 _ = tokio::time::sleep(timeout) => {
593 if let Some(pid) = child_id {
594 kill_process_group(pid);
595 }
596 let _ = child.kill().await;
597 let _ = stdout_pump.await;
598 return Err("execution timed out".into());
599 }
600 }
601 } else {
602 child.wait().await.map_err(|e| e.to_string())?
603 };
604 let _ = stdout_pump.await;
605 let stderr = stderr_handle.await.unwrap_or_default();
606 if let Some(pid) = child_id {
607 kill_process_group(pid);
608 }
609 Ok(ExecCompletion {
610 status: status.code().unwrap_or(-1),
611 stderr,
612 })
613 });
614
615 Ok(StreamingExec {
616 stdout_rx: rx,
617 completion,
618 })
619}
620
621#[cfg(unix)]
622fn kill_process_group(pid: u32) {
623 let pgid = -(pid as libc::pid_t);
624 unsafe {
627 libc::kill(pgid, libc::SIGKILL);
628 }
629}
630
631#[cfg(not(unix))]
632fn kill_process_group(_pid: u32) {}
633
634fn shell_word(value: &str, force_quote: bool) -> String {
635 if force_quote || value.is_empty() || value.chars().any(char::is_whitespace) {
636 shell_quote(value)
637 } else {
638 value.to_string()
639 }
640}
641
642fn shell_quote(value: &str) -> String {
643 let mut quoted = String::from("'");
644 for ch in value.chars() {
645 if ch == '\'' {
646 quoted.push_str("'\\''");
647 } else {
648 quoted.push(ch);
649 }
650 }
651 quoted.push('\'');
652 quoted
653}