Skip to main content

rsigma_runtime/enrichment/
command.rs

1//! `CommandEnricher`: per-result local-process execution.
2//!
3//! Runs a local command via [`tokio::process::Command`] with a
4//! template-expanded argv and an optional template-expanded environment.
5//! Stdout is captured (capped at 10 MB to mirror the existing
6//! dynamic-source command resolver) and parsed as either a JSON value or
7//! a raw string (the [`OutputFormat`] knob).
8//!
9//! Non-zero exit codes map to [`EnrichErrorKind::Fetch`]. Stderr is read
10//! into the error message (truncated at 4 KB) so operators can see what
11//! went wrong without grepping the daemon's tracing output.
12
13use std::collections::HashMap;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use rsigma_eval::EvaluationResult;
18use tokio::process::Command;
19
20use super::{
21    EnrichError, EnrichErrorKind, Enricher, EnricherKind, OnError, Scope, inject_enrichment,
22    template::render_template,
23};
24
25/// Maximum stdout bytes captured per invocation. Mirrors
26/// [`crate::sources::MAX_SOURCE_RESPONSE_BYTES`] so the two surfaces
27/// share the same hard limit.
28const MAX_COMMAND_STDOUT: usize = 10 * 1024 * 1024;
29/// Maximum stderr bytes attached to error messages.
30const MAX_COMMAND_STDERR_IN_ERROR: usize = 4 * 1024;
31
32/// How to interpret captured stdout.
33#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
34pub enum OutputFormat {
35    /// Parse stdout as JSON. Non-JSON stdout produces a `Parse` error.
36    /// Default — matches the dynamic-pipelines command source behaviour.
37    #[default]
38    Json,
39    /// Inject stdout as a raw `serde_json::Value::String`. Trailing
40    /// newlines are stripped.
41    Raw,
42}
43
44/// One command enricher instance.
45pub struct CommandEnricher {
46    id: String,
47    kind: EnricherKind,
48    inject_field: String,
49    argv: Vec<String>,
50    env: HashMap<String, String>,
51    timeout: Duration,
52    on_error: OnError,
53    scope: Scope,
54    output: OutputFormat,
55}
56
57impl CommandEnricher {
58    #[allow(clippy::too_many_arguments)]
59    pub fn new(
60        id: String,
61        kind: EnricherKind,
62        inject_field: String,
63        argv: Vec<String>,
64        env: HashMap<String, String>,
65        timeout: Duration,
66        on_error: OnError,
67        scope: Scope,
68        output: OutputFormat,
69    ) -> Self {
70        Self {
71            id,
72            kind,
73            inject_field,
74            argv,
75            env,
76            timeout,
77            on_error,
78            scope,
79            output,
80        }
81    }
82}
83
84#[async_trait]
85impl Enricher for CommandEnricher {
86    fn kind(&self) -> EnricherKind {
87        self.kind
88    }
89    fn id(&self) -> &str {
90        &self.id
91    }
92    fn inject_field(&self) -> &str {
93        &self.inject_field
94    }
95    fn timeout(&self) -> Duration {
96        self.timeout
97    }
98    fn scope(&self) -> &Scope {
99        &self.scope
100    }
101    fn on_error(&self) -> OnError {
102        self.on_error
103    }
104
105    async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError> {
106        if self.argv.is_empty() {
107            return Err(EnrichError {
108                enricher_id: self.id.clone(),
109                kind: EnrichErrorKind::Fetch("empty argv".to_string()),
110            });
111        }
112
113        let rendered: Vec<String> = self
114            .argv
115            .iter()
116            .map(|a| render_template(a, result))
117            .collect();
118
119        let mut cmd = Command::new(&rendered[0]);
120        cmd.args(&rendered[1..]);
121        // Replace the inherited env wholesale with the configured env
122        // when the operator supplies any entries; otherwise inherit the
123        // daemon's env (so e.g. `PATH` resolves binary names normally).
124        if !self.env.is_empty() {
125            for (k, v) in &self.env {
126                cmd.env(k, render_template(v, result));
127            }
128        }
129        cmd.stdout(std::process::Stdio::piped());
130        cmd.stderr(std::process::Stdio::piped());
131        cmd.kill_on_drop(true);
132
133        let output = cmd.output().await.map_err(|e| EnrichError {
134            enricher_id: self.id.clone(),
135            kind: EnrichErrorKind::Fetch(format!("spawn: {e}")),
136        })?;
137
138        if !output.status.success() {
139            let stderr = String::from_utf8_lossy(&output.stderr);
140            let mut snippet = stderr
141                .chars()
142                .take(MAX_COMMAND_STDERR_IN_ERROR)
143                .collect::<String>();
144            if stderr.len() > MAX_COMMAND_STDERR_IN_ERROR {
145                snippet.push_str("…[truncated]");
146            }
147            return Err(EnrichError {
148                enricher_id: self.id.clone(),
149                kind: EnrichErrorKind::Fetch(format!(
150                    "exit {:?}: {}",
151                    output.status.code(),
152                    snippet.trim()
153                )),
154            });
155        }
156
157        if output.stdout.len() > MAX_COMMAND_STDOUT {
158            return Err(EnrichError {
159                enricher_id: self.id.clone(),
160                kind: EnrichErrorKind::Fetch(format!("stdout exceeded {MAX_COMMAND_STDOUT} bytes")),
161            });
162        }
163
164        let value = match self.output {
165            OutputFormat::Json => serde_json::from_slice::<serde_json::Value>(&output.stdout)
166                .map_err(|e| EnrichError {
167                    enricher_id: self.id.clone(),
168                    kind: EnrichErrorKind::Parse(format!("JSON: {e}")),
169                })?,
170            OutputFormat::Raw => {
171                // Strip trailing CR and LF in any combination so that the
172                // Windows CRLF line ending from `cmd /C echo ...` produces
173                // the same captured value as the Unix LF from `sh -c
174                // echo ...`. We do not call `.trim_end()` because that
175                // would also strip trailing spaces, which a `Raw` capture
176                // is otherwise expected to preserve verbatim.
177                let s = String::from_utf8_lossy(&output.stdout);
178                serde_json::Value::String(s.trim_end_matches(['\r', '\n']).to_string())
179            }
180        };
181        inject_enrichment(result, &self.inject_field, value);
182        Ok(())
183    }
184}