Skip to main content

artur/
process.rs

1use crate::{
2    config::{TaskConfig, TaskMode, TaskOutputFormat, TaskStdin},
3    error::{ArturError, Result},
4};
5use bytes::Bytes;
6use serde::{Deserialize, Serialize};
7use serde_json::{Map, Value};
8use std::{
9    collections::{BTreeMap, HashMap},
10    process::Stdio,
11    sync::Arc,
12    time::{Duration, Instant},
13};
14use tokio::{io::AsyncWriteExt, process::Command, sync::RwLock, time::timeout};
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RequestContext {
19    pub method: String,
20    pub uri: String,
21    pub path: String,
22    pub params: BTreeMap<String, String>,
23    pub query: BTreeMap<String, String>,
24    pub headers: BTreeMap<String, String>,
25    pub body: String,
26    pub body_json: Option<Value>,
27    pub steps: BTreeMap<String, Value>,
28}
29
30impl RequestContext {
31    pub fn from_parts(
32        method: String,
33        uri: String,
34        path: String,
35        params: BTreeMap<String, String>,
36        query: BTreeMap<String, String>,
37        headers: BTreeMap<String, String>,
38        body: Bytes,
39    ) -> Self {
40        let body = String::from_utf8_lossy(&body).to_string();
41        let body_json = serde_json::from_str(&body).ok();
42        Self {
43            method,
44            uri,
45            path,
46            params,
47            query,
48            headers,
49            body,
50            body_json,
51            steps: BTreeMap::new(),
52        }
53    }
54
55    pub fn request_json(&self) -> Value {
56        serde_json::json!({
57            "method": self.method.clone(),
58            "uri": self.uri.clone(),
59            "path": self.path.clone(),
60            "params": self.params.clone(),
61            "query": self.query.clone(),
62            "headers": self.headers.clone(),
63            "body": self.body.clone(),
64            "body_json": self.body_json.clone(),
65            "steps": self.steps.clone(),
66        })
67    }
68
69    pub fn with_steps(&self, steps: BTreeMap<String, Value>) -> Self {
70        let mut cloned = self.clone();
71        cloned.steps = steps;
72        cloned
73    }
74}
75
76#[derive(Debug, Clone, Default)]
77pub struct JobStore {
78    jobs: Arc<RwLock<HashMap<String, JobRecord>>>,
79}
80
81#[derive(Debug, Clone, Serialize)]
82pub struct JobRecord {
83    pub id: String,
84    pub status: JobStatus,
85    pub task: String,
86    pub result: Option<TaskOutput>,
87}
88
89#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
90#[serde(rename_all = "snake_case")]
91pub enum JobStatus {
92    Running,
93    Completed,
94    Failed,
95}
96
97#[derive(Debug, Clone, Serialize)]
98pub struct TaskOutput {
99    pub ok: bool,
100    pub task: String,
101    pub status_code: Option<i32>,
102    pub stdout: String,
103    pub stderr: String,
104    pub timed_out: bool,
105    pub duration_ms: u128,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub json_parse_error: Option<String>,
108    #[serde(default, skip_serializing_if = "is_false")]
109    pub stdout_truncated: bool,
110    #[serde(default, skip_serializing_if = "is_false")]
111    pub stderr_truncated: bool,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub json: Option<Value>,
114}
115
116#[derive(Debug, Clone, Serialize)]
117#[serde(untagged)]
118pub enum TaskRunResponse {
119    Immediate(TaskOutput),
120    Accepted { job_id: String, status: JobStatus },
121}
122
123impl JobStore {
124    pub async fn get(&self, id: &str) -> Option<JobRecord> {
125        self.jobs.read().await.get(id).cloned()
126    }
127
128    async fn insert_running(&self, id: String, task: String) {
129        self.jobs.write().await.insert(
130            id.clone(),
131            JobRecord {
132                id,
133                status: JobStatus::Running,
134                task,
135                result: None,
136            },
137        );
138    }
139
140    async fn finish(&self, id: &str, result: Result<TaskOutput>) {
141        let mut jobs = self.jobs.write().await;
142        if let Some(record) = jobs.get_mut(id) {
143            match result {
144                Ok(output) => {
145                    record.status = if output.ok {
146                        JobStatus::Completed
147                    } else {
148                        JobStatus::Failed
149                    };
150                    record.result = Some(output);
151                }
152                Err(err) => {
153                    record.status = JobStatus::Failed;
154                    record.result = Some(TaskOutput {
155                        ok: false,
156                        task: record.task.clone(),
157                        status_code: None,
158                        stdout: String::new(),
159                        stderr: err.to_string(),
160                        timed_out: false,
161                        duration_ms: 0,
162                        json_parse_error: None,
163                        stdout_truncated: false,
164                        stderr_truncated: false,
165                        json: None,
166                    });
167                }
168            }
169        }
170    }
171}
172
173pub async fn run_or_enqueue(
174    cfg: TaskConfig,
175    request: RequestContext,
176    jobs: JobStore,
177) -> Result<TaskRunResponse> {
178    match cfg.mode {
179        TaskMode::Sync => Ok(TaskRunResponse::Immediate(run_task(&cfg, &request).await?)),
180        TaskMode::Async => {
181            let job_id = Uuid::new_v4().to_string();
182            jobs.insert_running(job_id.clone(), cfg.name.clone()).await;
183            let jobs_for_task = jobs.clone();
184            let cfg_for_task = cfg.clone();
185            let request_for_task = request.clone();
186            let job_id_for_task = job_id.clone();
187            tokio::spawn(async move {
188                let result = run_task(&cfg_for_task, &request_for_task).await;
189                jobs_for_task.finish(&job_id_for_task, result).await;
190            });
191            Ok(TaskRunResponse::Accepted {
192                job_id,
193                status: JobStatus::Running,
194            })
195        }
196    }
197}
198
199pub async fn run_task(cfg: &TaskConfig, request: &RequestContext) -> Result<TaskOutput> {
200    let started = Instant::now();
201    let args: Vec<String> = cfg
202        .args
203        .iter()
204        .map(|arg| render_template(arg, request))
205        .collect::<Result<Vec<_>>>()?;
206
207    let mut command = Command::new(&cfg.command);
208    command.kill_on_drop(true);
209    if !cfg.inherit_env {
210        command.env_clear();
211    }
212    command.args(args);
213    command.stdout(Stdio::piped()).stderr(Stdio::piped());
214
215    if let Some(working_dir) = &cfg.working_dir {
216        command.current_dir(render_template(working_dir, request)?);
217    }
218    for (key, value) in &cfg.env {
219        command.env(key, render_template(value, request)?);
220    }
221
222    let stdin_payload = render_stdin(&cfg.stdin, request)?;
223    let output_result = if let Some(stdin_payload) = stdin_payload {
224        command.stdin(Stdio::piped());
225        let mut child = command
226            .spawn()
227            .map_err(|err| ArturError::Process(format!("failed to spawn {}: {err}", cfg.name)))?;
228        if let Some(mut stdin) = child.stdin.take() {
229            stdin.write_all(stdin_payload.as_bytes()).await?;
230        }
231        timeout(
232            Duration::from_millis(cfg.timeout_ms),
233            child.wait_with_output(),
234        )
235        .await
236    } else {
237        timeout(Duration::from_millis(cfg.timeout_ms), command.output()).await
238    };
239
240    match output_result {
241        Ok(Ok(output)) => {
242            let (stdout, stdout_truncated) =
243                bytes_to_limited_string(&output.stdout, cfg.max_stdout_bytes);
244            let (stderr, stderr_truncated) =
245                bytes_to_limited_string(&output.stderr, cfg.max_stderr_bytes);
246            let mut json_parse_error = None;
247            let json = match cfg.stdout_format {
248                TaskOutputFormat::Text => None,
249                TaskOutputFormat::Json => match serde_json::from_str(&stdout) {
250                    Ok(value) => Some(value),
251                    Err(err) => {
252                        json_parse_error = Some(err.to_string());
253                        None
254                    }
255                },
256            };
257            let status_code = output.status.code();
258            let exit_ok = status_code
259                .map(|code| cfg.success_exit_codes.contains(&code))
260                .unwrap_or(false);
261            let json_ok = cfg.stdout_format != TaskOutputFormat::Json || json_parse_error.is_none();
262            Ok(TaskOutput {
263                ok: exit_ok && json_ok,
264                task: cfg.name.clone(),
265                status_code,
266                stdout,
267                stderr,
268                timed_out: false,
269                duration_ms: started.elapsed().as_millis(),
270                json_parse_error,
271                stdout_truncated,
272                stderr_truncated,
273                json,
274            })
275        }
276        Ok(Err(err)) => Err(ArturError::Process(format!(
277            "failed to run process {}: {err}",
278            cfg.name
279        ))),
280        Err(_) => Ok(TaskOutput {
281            ok: false,
282            task: cfg.name.clone(),
283            status_code: None,
284            stdout: String::new(),
285            stderr: format!("process timed out after {} ms", cfg.timeout_ms),
286            timed_out: true,
287            duration_ms: started.elapsed().as_millis(),
288            json_parse_error: None,
289            stdout_truncated: false,
290            stderr_truncated: false,
291            json: None,
292        }),
293    }
294}
295
296fn bytes_to_limited_string(bytes: &[u8], limit: usize) -> (String, bool) {
297    let truncated = bytes.len() > limit;
298    let visible = if truncated { &bytes[..limit] } else { bytes };
299    (String::from_utf8_lossy(visible).to_string(), truncated)
300}
301
302fn is_false(value: &bool) -> bool {
303    !*value
304}
305
306fn render_stdin(cfg: &TaskStdin, request: &RequestContext) -> Result<Option<String>> {
307    match cfg {
308        TaskStdin::None => Ok(None),
309        TaskStdin::Body => Ok(Some(request.body.clone())),
310        TaskStdin::RequestJson => Ok(Some(serde_json::to_string(&request.request_json())?)),
311        TaskStdin::Template { template } => Ok(Some(render_template(template, request)?)),
312    }
313}
314
315pub fn render_template(template: &str, request: &RequestContext) -> Result<String> {
316    let mut rendered = String::with_capacity(template.len());
317    let mut rest = template;
318    while let Some(start) = rest.find("{{") {
319        rendered.push_str(&rest[..start]);
320        let after_start = &rest[start + 2..];
321        let Some(end) = after_start.find("}}") else {
322            return Err(ArturError::Config(format!(
323                "unclosed template expression in {template:?}"
324            )));
325        };
326        let key = after_start[..end].trim();
327        rendered.push_str(&lookup_template_value(key, request));
328        rest = &after_start[end + 2..];
329    }
330    rendered.push_str(rest);
331    Ok(rendered)
332}
333
334pub fn lookup_template_value(key: &str, request: &RequestContext) -> String {
335    lookup_template_json_value(key, request)
336        .map(|value| json_scalar_to_string(&value))
337        .unwrap_or_default()
338}
339
340pub fn lookup_template_json_value(key: &str, request: &RequestContext) -> Option<Value> {
341    match key {
342        "method" => Some(Value::String(request.method.clone())),
343        "uri" => Some(Value::String(request.uri.clone())),
344        "path" => Some(Value::String(request.path.clone())),
345        "body" => Some(Value::String(request.body.clone())),
346        "request" | "request_json" => Some(request.request_json()),
347        "body_json" => request.body_json.clone(),
348        "steps" => Some(Value::Object(
349            request
350                .steps
351                .iter()
352                .map(|(key, value)| (key.clone(), value.clone()))
353                .collect::<Map<String, Value>>(),
354        )),
355        _ if key.starts_with("param.") => {
356            Some(Value::String(lookup_map(&request.params, &key[6..])))
357        }
358        _ if key.starts_with("query.") => {
359            Some(Value::String(lookup_map(&request.query, &key[6..])))
360        }
361        _ if key.starts_with("header.") => Some(Value::String(lookup_map(
362            &request.headers,
363            &key[7..].to_ascii_lowercase(),
364        ))),
365        _ if key.starts_with("env.") => std::env::var(&key[4..]).ok().map(Value::String),
366        _ if key.starts_with("body_json.") => {
367            lookup_json_path_value(request.body_json.as_ref(), &key[10..])
368        }
369        _ if key.starts_with("steps.") => {
370            let steps = Value::Object(
371                request
372                    .steps
373                    .iter()
374                    .map(|(key, value)| (key.clone(), value.clone()))
375                    .collect::<Map<String, Value>>(),
376            );
377            lookup_json_path_value(Some(&steps), &key[6..])
378        }
379        _ if key.starts_with("step.") => {
380            let steps = Value::Object(
381                request
382                    .steps
383                    .iter()
384                    .map(|(key, value)| (key.clone(), value.clone()))
385                    .collect::<Map<String, Value>>(),
386            );
387            lookup_json_path_value(Some(&steps), &key[5..])
388        }
389        _ => None,
390    }
391}
392
393fn lookup_map(map: &BTreeMap<String, String>, key: &str) -> String {
394    map.get(key).cloned().unwrap_or_default()
395}
396
397fn lookup_json_path_value(value: Option<&Value>, path: &str) -> Option<Value> {
398    let mut value = value?;
399    if path.trim().is_empty() {
400        return Some(value.clone());
401    }
402    for part in path.split('.') {
403        match value {
404            Value::Object(map) => {
405                value = map.get(part)?;
406            }
407            Value::Array(items) => {
408                value = items.get(part.parse::<usize>().ok()?)?;
409            }
410            _ => return None,
411        }
412    }
413    Some(value.clone())
414}
415
416pub fn render_json_value(value: &Value, request: &RequestContext) -> Result<Value> {
417    match value {
418        Value::String(template) => {
419            if let Some(key) = whole_template_key(template)
420                && let Some(value) = lookup_template_json_value(key, request)
421            {
422                return Ok(value);
423            }
424            Ok(Value::String(render_template(template, request)?))
425        }
426        Value::Array(items) => items
427            .iter()
428            .map(|item| render_json_value(item, request))
429            .collect::<Result<Vec<_>>>()
430            .map(Value::Array),
431        Value::Object(map) => map
432            .iter()
433            .map(|(key, value)| Ok((key.clone(), render_json_value(value, request)?)))
434            .collect::<Result<Map<String, Value>>>()
435            .map(Value::Object),
436        Value::Null | Value::Bool(_) | Value::Number(_) => Ok(value.clone()),
437    }
438}
439
440fn whole_template_key(template: &str) -> Option<&str> {
441    let trimmed = template.trim();
442    if trimmed.starts_with("{{") && trimmed.ends_with("}}") {
443        let key = trimmed[2..trimmed.len() - 2].trim();
444        if !key.contains("{{") && !key.contains("}}") {
445            return Some(key);
446        }
447    }
448    None
449}
450
451fn json_scalar_to_string(value: &Value) -> String {
452    match value {
453        Value::Null => String::new(),
454        Value::Bool(v) => v.to_string(),
455        Value::Number(v) => v.to_string(),
456        Value::String(v) => v.clone(),
457        Value::Array(_) | Value::Object(_) => value.to_string(),
458    }
459}
460
461pub fn header_map_to_btree(headers: &axum::http::HeaderMap) -> BTreeMap<String, String> {
462    let mut out = BTreeMap::new();
463    for (name, value) in headers {
464        if let Ok(value) = value.to_str() {
465            out.insert(name.as_str().to_ascii_lowercase(), value.to_string());
466        }
467    }
468    out
469}
470
471pub fn hashmap_to_btree(input: HashMap<String, String>) -> BTreeMap<String, String> {
472    input.into_iter().collect()
473}
474
475pub fn btree_to_json_object(input: &BTreeMap<String, String>) -> Value {
476    Value::Object(
477        input
478            .iter()
479            .map(|(key, value)| (key.clone(), Value::String(value.clone())))
480            .collect::<Map<String, Value>>(),
481    )
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    fn context() -> RequestContext {
489        RequestContext {
490            method: "POST".to_string(),
491            uri: "/run/abc?x=1".to_string(),
492            path: "/run/abc".to_string(),
493            params: BTreeMap::from([("id".to_string(), "abc".to_string())]),
494            query: BTreeMap::from([("x".to_string(), "1".to_string())]),
495            headers: BTreeMap::from([("authorization".to_string(), "Bearer token".to_string())]),
496            body: r#"{"name":"Ada","items":["a","b"]}"#.to_string(),
497            body_json: Some(serde_json::json!({ "name": "Ada", "items": ["a", "b"] })),
498            steps: BTreeMap::from([(
499                "sid".to_string(),
500                serde_json::json!({ "json": { "sid": "123" } }),
501            )]),
502        }
503    }
504
505    #[test]
506    fn renders_templates_from_request_context() {
507        let rendered = render_template(
508            "{{method}} {{param.id}} {{query.x}} {{header.authorization}} {{body_json.name}} {{body_json.items.1}}",
509            &context(),
510        )
511        .unwrap();
512        assert_eq!(rendered, "POST abc 1 Bearer token Ada b");
513    }
514
515    #[test]
516    fn leaves_unknown_template_as_empty() {
517        assert_eq!(render_template("x{{missing}}y", &context()).unwrap(), "xy");
518    }
519
520    #[test]
521    fn renders_json_values_without_stringifying_whole_templates() {
522        let rendered = render_json_value(
523            &serde_json::json!({
524                "sid": "{{steps.sid.json.sid}}",
525                "request": "{{request_json}}"
526            }),
527            &context(),
528        )
529        .unwrap();
530        assert_eq!(rendered["sid"], "123");
531        assert_eq!(rendered["request"]["method"], "POST");
532    }
533}