1use crate::spec::{ExecStage, ExecToken, TextPart, ValueRef};
20use bytes::Bytes;
21use std::collections::BTreeMap;
22use std::process::Stdio;
23use tokio::io::AsyncWriteExt;
24use tokio::process::Command;
25
26#[derive(Clone, Debug)]
29pub enum BodyValue {
30 None,
31 Text(String),
32 Json(serde_json::Value),
33 Form(BTreeMap<String, String>),
34 Binary(Bytes),
35}
36
37impl Default for BodyValue {
38 fn default() -> Self {
39 BodyValue::None
40 }
41}
42
43#[derive(Clone, Debug, Default)]
44pub struct ExecContext {
45 pub query: BTreeMap<String, String>,
46 pub path: BTreeMap<String, String>,
47 pub headers: BTreeMap<String, String>,
48 pub vars: BTreeMap<String, String>,
49 pub body: BodyValue,
50}
51
52impl ExecContext {
53 fn resolve_text(&self, r: &ValueRef) -> Option<String> {
54 match r {
55 ValueRef::Query(n) => self.query.get(n).cloned(),
56 ValueRef::Path(n) => self.path.get(n).cloned(),
57 ValueRef::Header(n) => self.headers.get(n).cloned(),
58 ValueRef::Var(n) => self.vars.get(n).cloned(),
59 ValueRef::Body { path } => match &self.body {
60 BodyValue::None => None,
61 BodyValue::Text(s) if path.is_empty() => Some(s.clone()),
62 BodyValue::Text(_) => None,
63 BodyValue::Json(v) => {
64 let mut cur = v;
65 for p in path {
66 cur = cur.get(p)?;
67 }
68 Some(json_to_text(cur))
69 }
70 BodyValue::Form(m) => {
71 if path.is_empty() {
72 Some(form_to_text(m))
73 } else if path.len() == 1 {
74 m.get(&path[0]).cloned()
75 } else {
76 None
77 }
78 }
79 BodyValue::Binary(_) => None,
80 },
81 }
82 }
83
84 fn resolve_bytes(&self, r: &ValueRef) -> Option<Vec<u8>> {
85 if let ValueRef::Body { path } = r {
86 if path.is_empty() {
87 if let BodyValue::Binary(b) = &self.body {
88 return Some(b.to_vec());
89 }
90 }
91 }
92 self.resolve_text(r).map(|s| s.into_bytes())
93 }
94}
95
96fn json_to_text(v: &serde_json::Value) -> String {
97 match v {
98 serde_json::Value::String(s) => s.clone(),
99 other => other.to_string(),
100 }
101}
102
103fn form_to_text(m: &BTreeMap<String, String>) -> String {
104 let pairs: Vec<String> = m.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
105 pairs.join("&")
106}
107
108fn render_text_parts(parts: &[TextPart], ctx: &ExecContext) -> (String, bool) {
111 let mut out = String::new();
112 let mut all_present = true;
113 for p in parts {
114 match p {
115 TextPart::Literal(s) => out.push_str(s),
116 TextPart::Interp(r) => match ctx.resolve_text(r) {
117 Some(s) => out.push_str(&s),
118 None => all_present = false,
119 },
120 }
121 }
122 (out, all_present)
123}
124
125pub fn build_argv(tokens: &[ExecToken], ctx: &ExecContext) -> Vec<String> {
126 tracing::debug!(tokens = tokens.len(), "exec::build_argv");
127 let mut argv = Vec::new();
128 for t in tokens {
129 match t {
130 ExecToken::Text { parts, .. } => {
131 let (s, _) = render_text_parts(parts, ctx);
132 argv.push(s);
133 }
134 ExecToken::Group { pieces, .. } => {
135 let mut piece_strs = Vec::with_capacity(pieces.len());
136 let mut all_present = true;
137 for piece in pieces {
138 let (s, present) = render_text_parts(&piece.parts, ctx);
139 if !present {
140 all_present = false;
141 break;
142 }
143 piece_strs.push(s);
144 }
145 if all_present {
146 argv.extend(piece_strs);
147 }
148 }
149 }
150 }
151 argv
152}
153
154#[derive(Debug)]
157pub struct ExecOutput {
158 pub status: i32,
159 pub stdout: Vec<u8>,
160 pub stderr: Vec<u8>,
161}
162
163pub fn preview_pipeline(pipeline: &[ExecStage], ctx: &ExecContext) -> Vec<String> {
166 tracing::debug!(stages = pipeline.len(), "exec::preview_pipeline");
167 let mut out = Vec::with_capacity(pipeline.len());
168 for stage in pipeline {
169 match stage {
170 ExecStage::Source { reference, .. } => {
171 let resolved = ctx
172 .resolve_text(reference)
173 .map(|s| {
174 if s.len() > 200 {
175 format!("{}…", &s[..200])
176 } else {
177 s
178 }
179 })
180 .unwrap_or_else(|| "<unresolved>".into());
181 out.push(format!("stdin <- {} = {:?}", reference.describe(), resolved));
182 }
183 ExecStage::Command { tokens, .. } => {
184 let argv = build_argv(tokens, ctx);
185 out.push(format!("argv: {:?}", argv));
186 }
187 }
188 }
189 out
190}
191
192pub async fn run_pipeline(
193 pipeline: &[ExecStage],
194 ctx: &ExecContext,
195 timeout: Option<std::time::Duration>,
196) -> Result<ExecOutput, String> {
197 tracing::debug!(stages = pipeline.len(), ?timeout, "exec::run_pipeline");
198 if pipeline.is_empty() {
199 return Err("empty exec pipeline".into());
200 }
201 let fut = run_pipeline_inner(pipeline, ctx);
202 if let Some(t) = timeout {
203 match tokio::time::timeout(t, fut).await {
204 Ok(r) => r,
205 Err(_) => Err("execution timed out".into()),
206 }
207 } else {
208 fut.await
209 }
210}
211
212async fn run_pipeline_inner(
213 pipeline: &[ExecStage],
214 ctx: &ExecContext,
215) -> Result<ExecOutput, String> {
216 let mut pending_stdin: Option<Vec<u8>> = None;
217 let mut prev_child: Option<tokio::process::Child> = None;
218
219 let mut i = 0;
220 while i < pipeline.len() {
221 match &pipeline[i] {
222 ExecStage::Source { reference, .. } => {
223 if prev_child.is_some() {
224 return Err(
225 "value-reference source after a command stage is not supported".into(),
226 );
227 }
228 let bytes = ctx
229 .resolve_bytes(reference)
230 .ok_or_else(|| format!("unresolved {}", reference.describe()))?;
231 pending_stdin = Some(bytes);
232 i += 1;
233 }
234 ExecStage::Command { tokens, .. } => {
235 let argv = build_argv(tokens, ctx);
236 if argv.is_empty() {
237 return Err("command stage produced empty argv".into());
238 }
239 let program = argv[0].clone();
240 let args = &argv[1..];
241 tracing::debug!(program = %program, args = ?args, "exec::run_pipeline: spawning");
242 let mut cmd = Command::new(&program);
243 cmd.args(args);
244 cmd.stdout(Stdio::piped());
245 cmd.stderr(Stdio::piped());
246
247 let stdin_data: Option<Vec<u8>>;
248 if let Some(mut prev) = prev_child.take() {
249 if let Some(out) = prev.stdout.take() {
250 let std_out: std::process::Stdio = out
251 .try_into()
252 .map_err(|e: std::io::Error| e.to_string())?;
253 cmd.stdin(std_out);
254 } else {
255 cmd.stdin(Stdio::null());
256 }
257 tokio::spawn(async move {
258 let _ = prev.wait().await;
259 });
260 stdin_data = None;
261 } else if let Some(d) = pending_stdin.take() {
262 cmd.stdin(Stdio::piped());
263 stdin_data = Some(d);
264 } else {
265 cmd.stdin(Stdio::null());
266 stdin_data = None;
267 }
268
269 let mut child = cmd
270 .spawn()
271 .map_err(|e| format!("failed to spawn `{}`: {}", program, e))?;
272 if let Some(d) = stdin_data {
273 if let Some(mut sin) = child.stdin.take() {
274 sin.write_all(&d).await.map_err(|e| e.to_string())?;
275 drop(sin);
276 }
277 }
278 prev_child = Some(child);
279 i += 1;
280 }
281 }
282 }
283
284 let mut child = prev_child.ok_or_else(|| "pipeline ended without a command".to_string())?;
285 let stdout = child.stdout.take();
286 let stderr = child.stderr.take();
287 let stdout_handle = tokio::spawn(async move {
288 let mut buf = Vec::new();
289 if let Some(mut s) = stdout {
290 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
291 .await
292 .ok();
293 }
294 buf
295 });
296 let stderr_handle = tokio::spawn(async move {
297 let mut buf = Vec::new();
298 if let Some(mut s) = stderr {
299 tokio::io::AsyncReadExt::read_to_end(&mut s, &mut buf)
300 .await
301 .ok();
302 }
303 buf
304 });
305 let status = child.wait().await.map_err(|e| e.to_string())?;
306 let stdout = stdout_handle.await.unwrap_or_default();
307 let stderr = stderr_handle.await.unwrap_or_default();
308 Ok(ExecOutput {
309 status: status.code().unwrap_or(-1),
310 stdout,
311 stderr,
312 })
313}