1use crate::{
2 config::{TaskConfig, TaskMode, TaskOutputFormat, TaskStdin},
3 error::{ArturError, Result},
4};
5use bytes::Bytes;
6use serde::{Deserialize, Serialize};
7use serde_json::{Map, Value};
8use std::{
9 collections::{BTreeMap, HashMap},
10 process::Stdio,
11 sync::Arc,
12 time::{Duration, Instant},
13};
14use tokio::{io::AsyncWriteExt, process::Command, sync::RwLock, time::timeout};
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RequestContext {
19 pub method: String,
20 pub uri: String,
21 pub path: String,
22 pub params: BTreeMap<String, String>,
23 pub query: BTreeMap<String, String>,
24 pub headers: BTreeMap<String, String>,
25 pub body: String,
26 pub body_json: Option<Value>,
27 pub steps: BTreeMap<String, Value>,
28}
29
30impl RequestContext {
31 pub fn from_parts(
32 method: String,
33 uri: String,
34 path: String,
35 params: BTreeMap<String, String>,
36 query: BTreeMap<String, String>,
37 headers: BTreeMap<String, String>,
38 body: Bytes,
39 ) -> Self {
40 let body = String::from_utf8_lossy(&body).to_string();
41 let body_json = serde_json::from_str(&body).ok();
42 Self {
43 method,
44 uri,
45 path,
46 params,
47 query,
48 headers,
49 body,
50 body_json,
51 steps: BTreeMap::new(),
52 }
53 }
54
55 pub fn request_json(&self) -> Value {
56 serde_json::json!({
57 "method": self.method.clone(),
58 "uri": self.uri.clone(),
59 "path": self.path.clone(),
60 "params": self.params.clone(),
61 "query": self.query.clone(),
62 "headers": self.headers.clone(),
63 "body": self.body.clone(),
64 "body_json": self.body_json.clone(),
65 "steps": self.steps.clone(),
66 })
67 }
68
69 pub fn with_steps(&self, steps: BTreeMap<String, Value>) -> Self {
70 let mut cloned = self.clone();
71 cloned.steps = steps;
72 cloned
73 }
74}
75
76#[derive(Debug, Clone, Default)]
77pub struct JobStore {
78 jobs: Arc<RwLock<HashMap<String, JobRecord>>>,
79}
80
81#[derive(Debug, Clone, Serialize)]
82pub struct JobRecord {
83 pub id: String,
84 pub status: JobStatus,
85 pub task: String,
86 pub result: Option<TaskOutput>,
87}
88
89#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
90#[serde(rename_all = "snake_case")]
91pub enum JobStatus {
92 Running,
93 Completed,
94 Failed,
95}
96
97#[derive(Debug, Clone, Serialize)]
98pub struct TaskOutput {
99 pub ok: bool,
100 pub task: String,
101 pub status_code: Option<i32>,
102 pub stdout: String,
103 pub stderr: String,
104 pub timed_out: bool,
105 pub duration_ms: u128,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub json_parse_error: Option<String>,
108 #[serde(default, skip_serializing_if = "is_false")]
109 pub stdout_truncated: bool,
110 #[serde(default, skip_serializing_if = "is_false")]
111 pub stderr_truncated: bool,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub json: Option<Value>,
114}
115
116#[derive(Debug, Clone, Serialize)]
117#[serde(untagged)]
118pub enum TaskRunResponse {
119 Immediate(TaskOutput),
120 Accepted { job_id: String, status: JobStatus },
121}
122
123impl JobStore {
124 pub async fn get(&self, id: &str) -> Option<JobRecord> {
125 self.jobs.read().await.get(id).cloned()
126 }
127
128 async fn insert_running(&self, id: String, task: String) {
129 self.jobs.write().await.insert(
130 id.clone(),
131 JobRecord {
132 id,
133 status: JobStatus::Running,
134 task,
135 result: None,
136 },
137 );
138 }
139
140 async fn finish(&self, id: &str, result: Result<TaskOutput>) {
141 let mut jobs = self.jobs.write().await;
142 if let Some(record) = jobs.get_mut(id) {
143 match result {
144 Ok(output) => {
145 record.status = if output.ok {
146 JobStatus::Completed
147 } else {
148 JobStatus::Failed
149 };
150 record.result = Some(output);
151 }
152 Err(err) => {
153 record.status = JobStatus::Failed;
154 record.result = Some(TaskOutput {
155 ok: false,
156 task: record.task.clone(),
157 status_code: None,
158 stdout: String::new(),
159 stderr: err.to_string(),
160 timed_out: false,
161 duration_ms: 0,
162 json_parse_error: None,
163 stdout_truncated: false,
164 stderr_truncated: false,
165 json: None,
166 });
167 }
168 }
169 }
170 }
171}
172
173pub async fn run_or_enqueue(
174 cfg: TaskConfig,
175 request: RequestContext,
176 jobs: JobStore,
177) -> Result<TaskRunResponse> {
178 match cfg.mode {
179 TaskMode::Sync => Ok(TaskRunResponse::Immediate(run_task(&cfg, &request).await?)),
180 TaskMode::Async => {
181 let job_id = Uuid::new_v4().to_string();
182 jobs.insert_running(job_id.clone(), cfg.name.clone()).await;
183 let jobs_for_task = jobs.clone();
184 let cfg_for_task = cfg.clone();
185 let request_for_task = request.clone();
186 let job_id_for_task = job_id.clone();
187 tokio::spawn(async move {
188 let result = run_task(&cfg_for_task, &request_for_task).await;
189 jobs_for_task.finish(&job_id_for_task, result).await;
190 });
191 Ok(TaskRunResponse::Accepted {
192 job_id,
193 status: JobStatus::Running,
194 })
195 }
196 }
197}
198
199pub async fn run_task(cfg: &TaskConfig, request: &RequestContext) -> Result<TaskOutput> {
200 let started = Instant::now();
201 let args: Vec<String> = cfg
202 .args
203 .iter()
204 .map(|arg| render_template(arg, request))
205 .collect::<Result<Vec<_>>>()?;
206
207 let mut command = Command::new(&cfg.command);
208 command.kill_on_drop(true);
209 if !cfg.inherit_env {
210 command.env_clear();
211 }
212 command.args(args);
213 command.stdout(Stdio::piped()).stderr(Stdio::piped());
214
215 if let Some(working_dir) = &cfg.working_dir {
216 command.current_dir(render_template(working_dir, request)?);
217 }
218 for (key, value) in &cfg.env {
219 command.env(key, render_template(value, request)?);
220 }
221
222 let stdin_payload = render_stdin(&cfg.stdin, request)?;
223 let output_result = if let Some(stdin_payload) = stdin_payload {
224 command.stdin(Stdio::piped());
225 let mut child = command
226 .spawn()
227 .map_err(|err| ArturError::Process(format!("failed to spawn {}: {err}", cfg.name)))?;
228 if let Some(mut stdin) = child.stdin.take() {
229 stdin.write_all(stdin_payload.as_bytes()).await?;
230 }
231 timeout(
232 Duration::from_millis(cfg.timeout_ms),
233 child.wait_with_output(),
234 )
235 .await
236 } else {
237 timeout(Duration::from_millis(cfg.timeout_ms), command.output()).await
238 };
239
240 match output_result {
241 Ok(Ok(output)) => {
242 let (stdout, stdout_truncated) =
243 bytes_to_limited_string(&output.stdout, cfg.max_stdout_bytes);
244 let (stderr, stderr_truncated) =
245 bytes_to_limited_string(&output.stderr, cfg.max_stderr_bytes);
246 let mut json_parse_error = None;
247 let json = match cfg.stdout_format {
248 TaskOutputFormat::Text => None,
249 TaskOutputFormat::Json => match serde_json::from_str(&stdout) {
250 Ok(value) => Some(value),
251 Err(err) => {
252 json_parse_error = Some(err.to_string());
253 None
254 }
255 },
256 };
257 let status_code = output.status.code();
258 let exit_ok = status_code
259 .map(|code| cfg.success_exit_codes.contains(&code))
260 .unwrap_or(false);
261 let json_ok = cfg.stdout_format != TaskOutputFormat::Json || json_parse_error.is_none();
262 Ok(TaskOutput {
263 ok: exit_ok && json_ok,
264 task: cfg.name.clone(),
265 status_code,
266 stdout,
267 stderr,
268 timed_out: false,
269 duration_ms: started.elapsed().as_millis(),
270 json_parse_error,
271 stdout_truncated,
272 stderr_truncated,
273 json,
274 })
275 }
276 Ok(Err(err)) => Err(ArturError::Process(format!(
277 "failed to run process {}: {err}",
278 cfg.name
279 ))),
280 Err(_) => Ok(TaskOutput {
281 ok: false,
282 task: cfg.name.clone(),
283 status_code: None,
284 stdout: String::new(),
285 stderr: format!("process timed out after {} ms", cfg.timeout_ms),
286 timed_out: true,
287 duration_ms: started.elapsed().as_millis(),
288 json_parse_error: None,
289 stdout_truncated: false,
290 stderr_truncated: false,
291 json: None,
292 }),
293 }
294}
295
296fn bytes_to_limited_string(bytes: &[u8], limit: usize) -> (String, bool) {
297 let truncated = bytes.len() > limit;
298 let visible = if truncated { &bytes[..limit] } else { bytes };
299 (String::from_utf8_lossy(visible).to_string(), truncated)
300}
301
302fn is_false(value: &bool) -> bool {
303 !*value
304}
305
306fn render_stdin(cfg: &TaskStdin, request: &RequestContext) -> Result<Option<String>> {
307 match cfg {
308 TaskStdin::None => Ok(None),
309 TaskStdin::Body => Ok(Some(request.body.clone())),
310 TaskStdin::RequestJson => Ok(Some(serde_json::to_string(&request.request_json())?)),
311 TaskStdin::Template { template } => Ok(Some(render_template(template, request)?)),
312 }
313}
314
315pub fn render_template(template: &str, request: &RequestContext) -> Result<String> {
316 let mut rendered = String::with_capacity(template.len());
317 let mut rest = template;
318 while let Some(start) = rest.find("{{") {
319 rendered.push_str(&rest[..start]);
320 let after_start = &rest[start + 2..];
321 let Some(end) = after_start.find("}}") else {
322 return Err(ArturError::Config(format!(
323 "unclosed template expression in {template:?}"
324 )));
325 };
326 let key = after_start[..end].trim();
327 rendered.push_str(&lookup_template_value(key, request));
328 rest = &after_start[end + 2..];
329 }
330 rendered.push_str(rest);
331 Ok(rendered)
332}
333
334pub fn lookup_template_value(key: &str, request: &RequestContext) -> String {
335 lookup_template_json_value(key, request)
336 .map(|value| json_scalar_to_string(&value))
337 .unwrap_or_default()
338}
339
340pub fn lookup_template_json_value(key: &str, request: &RequestContext) -> Option<Value> {
341 match key {
342 "method" => Some(Value::String(request.method.clone())),
343 "uri" => Some(Value::String(request.uri.clone())),
344 "path" => Some(Value::String(request.path.clone())),
345 "body" => Some(Value::String(request.body.clone())),
346 "request" | "request_json" => Some(request.request_json()),
347 "body_json" => request.body_json.clone(),
348 "steps" => Some(Value::Object(
349 request
350 .steps
351 .iter()
352 .map(|(key, value)| (key.clone(), value.clone()))
353 .collect::<Map<String, Value>>(),
354 )),
355 _ if key.starts_with("param.") => {
356 Some(Value::String(lookup_map(&request.params, &key[6..])))
357 }
358 _ if key.starts_with("query.") => {
359 Some(Value::String(lookup_map(&request.query, &key[6..])))
360 }
361 _ if key.starts_with("header.") => Some(Value::String(lookup_map(
362 &request.headers,
363 &key[7..].to_ascii_lowercase(),
364 ))),
365 _ if key.starts_with("env.") => std::env::var(&key[4..]).ok().map(Value::String),
366 _ if key.starts_with("body_json.") => {
367 lookup_json_path_value(request.body_json.as_ref(), &key[10..])
368 }
369 _ if key.starts_with("steps.") => {
370 let steps = Value::Object(
371 request
372 .steps
373 .iter()
374 .map(|(key, value)| (key.clone(), value.clone()))
375 .collect::<Map<String, Value>>(),
376 );
377 lookup_json_path_value(Some(&steps), &key[6..])
378 }
379 _ if key.starts_with("step.") => {
380 let steps = Value::Object(
381 request
382 .steps
383 .iter()
384 .map(|(key, value)| (key.clone(), value.clone()))
385 .collect::<Map<String, Value>>(),
386 );
387 lookup_json_path_value(Some(&steps), &key[5..])
388 }
389 _ => None,
390 }
391}
392
393fn lookup_map(map: &BTreeMap<String, String>, key: &str) -> String {
394 map.get(key).cloned().unwrap_or_default()
395}
396
397fn lookup_json_path_value(value: Option<&Value>, path: &str) -> Option<Value> {
398 let mut value = value?;
399 if path.trim().is_empty() {
400 return Some(value.clone());
401 }
402 for part in path.split('.') {
403 match value {
404 Value::Object(map) => {
405 value = map.get(part)?;
406 }
407 Value::Array(items) => {
408 value = items.get(part.parse::<usize>().ok()?)?;
409 }
410 _ => return None,
411 }
412 }
413 Some(value.clone())
414}
415
416pub fn render_json_value(value: &Value, request: &RequestContext) -> Result<Value> {
417 match value {
418 Value::String(template) => {
419 if let Some(key) = whole_template_key(template)
420 && let Some(value) = lookup_template_json_value(key, request)
421 {
422 return Ok(value);
423 }
424 Ok(Value::String(render_template(template, request)?))
425 }
426 Value::Array(items) => items
427 .iter()
428 .map(|item| render_json_value(item, request))
429 .collect::<Result<Vec<_>>>()
430 .map(Value::Array),
431 Value::Object(map) => map
432 .iter()
433 .map(|(key, value)| Ok((key.clone(), render_json_value(value, request)?)))
434 .collect::<Result<Map<String, Value>>>()
435 .map(Value::Object),
436 Value::Null | Value::Bool(_) | Value::Number(_) => Ok(value.clone()),
437 }
438}
439
440fn whole_template_key(template: &str) -> Option<&str> {
441 let trimmed = template.trim();
442 if trimmed.starts_with("{{") && trimmed.ends_with("}}") {
443 let key = trimmed[2..trimmed.len() - 2].trim();
444 if !key.contains("{{") && !key.contains("}}") {
445 return Some(key);
446 }
447 }
448 None
449}
450
451fn json_scalar_to_string(value: &Value) -> String {
452 match value {
453 Value::Null => String::new(),
454 Value::Bool(v) => v.to_string(),
455 Value::Number(v) => v.to_string(),
456 Value::String(v) => v.clone(),
457 Value::Array(_) | Value::Object(_) => value.to_string(),
458 }
459}
460
461pub fn header_map_to_btree(headers: &axum::http::HeaderMap) -> BTreeMap<String, String> {
462 let mut out = BTreeMap::new();
463 for (name, value) in headers {
464 if let Ok(value) = value.to_str() {
465 out.insert(name.as_str().to_ascii_lowercase(), value.to_string());
466 }
467 }
468 out
469}
470
471pub fn hashmap_to_btree(input: HashMap<String, String>) -> BTreeMap<String, String> {
472 input.into_iter().collect()
473}
474
475pub fn btree_to_json_object(input: &BTreeMap<String, String>) -> Value {
476 Value::Object(
477 input
478 .iter()
479 .map(|(key, value)| (key.clone(), Value::String(value.clone())))
480 .collect::<Map<String, Value>>(),
481 )
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 fn context() -> RequestContext {
489 RequestContext {
490 method: "POST".to_string(),
491 uri: "/run/abc?x=1".to_string(),
492 path: "/run/abc".to_string(),
493 params: BTreeMap::from([("id".to_string(), "abc".to_string())]),
494 query: BTreeMap::from([("x".to_string(), "1".to_string())]),
495 headers: BTreeMap::from([("authorization".to_string(), "Bearer token".to_string())]),
496 body: r#"{"name":"Ada","items":["a","b"]}"#.to_string(),
497 body_json: Some(serde_json::json!({ "name": "Ada", "items": ["a", "b"] })),
498 steps: BTreeMap::from([(
499 "sid".to_string(),
500 serde_json::json!({ "json": { "sid": "123" } }),
501 )]),
502 }
503 }
504
505 #[test]
506 fn renders_templates_from_request_context() {
507 let rendered = render_template(
508 "{{method}} {{param.id}} {{query.x}} {{header.authorization}} {{body_json.name}} {{body_json.items.1}}",
509 &context(),
510 )
511 .unwrap();
512 assert_eq!(rendered, "POST abc 1 Bearer token Ada b");
513 }
514
515 #[test]
516 fn leaves_unknown_template_as_empty() {
517 assert_eq!(render_template("x{{missing}}y", &context()).unwrap(), "xy");
518 }
519
520 #[test]
521 fn renders_json_values_without_stringifying_whole_templates() {
522 let rendered = render_json_value(
523 &serde_json::json!({
524 "sid": "{{steps.sid.json.sid}}",
525 "request": "{{request_json}}"
526 }),
527 &context(),
528 )
529 .unwrap();
530 assert_eq!(rendered["sid"], "123");
531 assert_eq!(rendered["request"]["method"], "POST");
532 }
533}