Skip to main content

rsigma_runtime/sources/
command.rs

1//! Command source resolver: runs a local command and captures stdout.
2
3use std::time::{Duration, Instant};
4
5use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
6use tokio::io::AsyncReadExt;
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
11
12const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);
13
14/// Resolve a command source by executing it and parsing stdout.
15pub async fn resolve_command(
16    command: &[String],
17    format: DataFormat,
18    extract_expr: Option<&ExtractExpr>,
19    timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21    resolve_command_with_limit(
22        command,
23        format,
24        extract_expr,
25        timeout,
26        MAX_SOURCE_RESPONSE_BYTES,
27    )
28    .await
29}
30
31/// Same as [`resolve_command`] but with a configurable stdout size limit.
32pub async fn resolve_command_with_limit(
33    command: &[String],
34    format: DataFormat,
35    extract_expr: Option<&ExtractExpr>,
36    timeout: Option<Duration>,
37    max_stdout_bytes: usize,
38) -> Result<ResolvedValue, SourceError> {
39    if command.is_empty() {
40        return Err(SourceError {
41            source_id: String::new(),
42            kind: SourceErrorKind::Fetch("command is empty".into()),
43        });
44    }
45
46    let mut child = tokio::process::Command::new(&command[0])
47        .args(&command[1..])
48        .stdout(std::process::Stdio::piped())
49        .stderr(std::process::Stdio::piped())
50        .spawn()
51        .map_err(|e| SourceError {
52            source_id: String::new(),
53            kind: SourceErrorKind::Fetch(format!("failed to spawn '{}': {e}", command[0])),
54        })?;
55
56    let deadline = timeout.unwrap_or(DEFAULT_COMMAND_TIMEOUT);
57
58    let result = tokio::time::timeout(deadline, async {
59        let mut stdout_buf = Vec::new();
60        let mut stderr_buf = Vec::new();
61
62        if let Some(mut stdout) = child.stdout.take() {
63            let mut tmp = vec![0u8; 8192];
64            loop {
65                let n = stdout.read(&mut tmp).await.map_err(|e| SourceError {
66                    source_id: String::new(),
67                    kind: SourceErrorKind::Fetch(format!("failed to read stdout: {e}")),
68                })?;
69                if n == 0 {
70                    break;
71                }
72                if stdout_buf.len() + n > max_stdout_bytes {
73                    let _ = child.kill().await;
74                    return Err(SourceError {
75                        source_id: String::new(),
76                        kind: SourceErrorKind::ResourceLimit(format!(
77                            "command stdout exceeds {} byte limit",
78                            max_stdout_bytes
79                        )),
80                    });
81                }
82                stdout_buf.extend_from_slice(&tmp[..n]);
83            }
84        }
85
86        if let Some(mut stderr) = child.stderr.take() {
87            let cap = 64 * 1024; // 64 KB for error messages
88            let mut tmp = vec![0u8; 4096];
89            loop {
90                let n = stderr.read(&mut tmp).await.unwrap_or(0);
91                if n == 0 {
92                    break;
93                }
94                if stderr_buf.len() + n > cap {
95                    break;
96                }
97                stderr_buf.extend_from_slice(&tmp[..n]);
98            }
99        }
100
101        let status = child.wait().await.map_err(|e| SourceError {
102            source_id: String::new(),
103            kind: SourceErrorKind::Fetch(format!("command execution failed: {e}")),
104        })?;
105
106        Ok((status, stdout_buf, stderr_buf))
107    })
108    .await;
109
110    let (status, stdout_bytes, stderr_bytes) = match result {
111        Ok(inner) => inner?,
112        Err(_) => {
113            let _ = child.kill().await;
114            return Err(SourceError {
115                source_id: String::new(),
116                kind: SourceErrorKind::Timeout,
117            });
118        }
119    };
120
121    if !status.success() {
122        let stderr = String::from_utf8_lossy(&stderr_bytes);
123        return Err(SourceError {
124            source_id: String::new(),
125            kind: SourceErrorKind::Fetch(format!(
126                "command exited with {}: {}",
127                status,
128                stderr.trim()
129            )),
130        });
131    }
132
133    let stdout = String::from_utf8(stdout_bytes).map_err(|e| SourceError {
134        source_id: String::new(),
135        kind: SourceErrorKind::Parse(format!("command output is not valid UTF-8: {e}")),
136    })?;
137
138    let parsed = parse_data(&stdout, format)?;
139
140    let data = if let Some(expr) = extract_expr {
141        apply_extract(&parsed, expr)?
142    } else {
143        parsed
144    };
145
146    Ok(ResolvedValue {
147        data,
148        resolved_at: Instant::now(),
149        from_cache: false,
150    })
151}