Skip to main content

rsigma_runtime/sources/
command.rs

1//! Command source resolver: runs a local command and captures stdout.
2
3use std::time::{Duration, Instant};
4
5use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
6use tokio::io::AsyncReadExt;
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
11
12const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);
13
14/// Resolve a command source by executing it and parsing stdout.
15pub async fn resolve_command(
16    command: &[String],
17    format: DataFormat,
18    extract_expr: Option<&ExtractExpr>,
19    timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21    resolve_command_with_limit(
22        command,
23        format,
24        extract_expr,
25        timeout,
26        MAX_SOURCE_RESPONSE_BYTES,
27    )
28    .await
29}
30
31/// Same as [`resolve_command`] but with a configurable stdout size limit.
32pub async fn resolve_command_with_limit(
33    command: &[String],
34    format: DataFormat,
35    extract_expr: Option<&ExtractExpr>,
36    timeout: Option<Duration>,
37    max_stdout_bytes: usize,
38) -> Result<ResolvedValue, SourceError> {
39    if command.is_empty() {
40        return Err(SourceError {
41            source_id: String::new(),
42            kind: SourceErrorKind::Fetch("command is empty".into()),
43        });
44    }
45
46    let mut child = tokio::process::Command::new(&command[0])
47        .args(&command[1..])
48        .stdout(std::process::Stdio::piped())
49        .stderr(std::process::Stdio::piped())
50        .spawn()
51        .map_err(|e| SourceError {
52            source_id: String::new(),
53            kind: SourceErrorKind::Fetch(format!("failed to spawn '{}': {e}", command[0])),
54        })?;
55
56    let deadline = timeout.unwrap_or(DEFAULT_COMMAND_TIMEOUT);
57
58    let result = tokio::time::timeout(deadline, async {
59        let mut stdout_buf = Vec::new();
60        let mut stderr_buf = Vec::new();
61
62        if let Some(mut stdout) = child.stdout.take() {
63            let mut tmp = vec![0u8; 8192];
64            loop {
65                let n = stdout.read(&mut tmp).await.map_err(|e| SourceError {
66                    source_id: String::new(),
67                    kind: SourceErrorKind::Fetch(format!("failed to read stdout: {e}")),
68                })?;
69                if n == 0 {
70                    break;
71                }
72                if stdout_buf.len() + n > max_stdout_bytes {
73                    let _ = child.kill().await;
74                    return Err(SourceError {
75                        source_id: String::new(),
76                        kind: SourceErrorKind::ResourceLimit(format!(
77                            "command stdout exceeds {max_stdout_bytes} byte limit"
78                        )),
79                    });
80                }
81                stdout_buf.extend_from_slice(&tmp[..n]);
82            }
83        }
84
85        if let Some(mut stderr) = child.stderr.take() {
86            let cap = 64 * 1024; // 64 KB for error messages
87            let mut tmp = vec![0u8; 4096];
88            loop {
89                let n = stderr.read(&mut tmp).await.unwrap_or(0);
90                if n == 0 {
91                    break;
92                }
93                if stderr_buf.len() + n > cap {
94                    break;
95                }
96                stderr_buf.extend_from_slice(&tmp[..n]);
97            }
98        }
99
100        let status = child.wait().await.map_err(|e| SourceError {
101            source_id: String::new(),
102            kind: SourceErrorKind::Fetch(format!("command execution failed: {e}")),
103        })?;
104
105        Ok((status, stdout_buf, stderr_buf))
106    })
107    .await;
108
109    let (status, stdout_bytes, stderr_bytes) = match result {
110        Ok(inner) => inner?,
111        Err(_) => {
112            let _ = child.kill().await;
113            return Err(SourceError {
114                source_id: String::new(),
115                kind: SourceErrorKind::Timeout,
116            });
117        }
118    };
119
120    if !status.success() {
121        let stderr = String::from_utf8_lossy(&stderr_bytes);
122        return Err(SourceError {
123            source_id: String::new(),
124            kind: SourceErrorKind::Fetch(format!(
125                "command exited with {}: {}",
126                status,
127                stderr.trim()
128            )),
129        });
130    }
131
132    let stdout = String::from_utf8(stdout_bytes).map_err(|e| SourceError {
133        source_id: String::new(),
134        kind: SourceErrorKind::Parse(format!("command output is not valid UTF-8: {e}")),
135    })?;
136
137    let parsed = parse_data(&stdout, format)?;
138
139    let data = if let Some(expr) = extract_expr {
140        apply_extract(&parsed, expr)?
141    } else {
142        parsed
143    };
144
145    Ok(ResolvedValue {
146        data,
147        resolved_at: Instant::now(),
148        from_cache: false,
149    })
150}