rsigma_runtime/sources/
command.rs1use std::time::{Duration, Instant};
4
5use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
6use tokio::io::AsyncReadExt;
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
11
12const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);
13
14pub async fn resolve_command(
16 command: &[String],
17 format: DataFormat,
18 extract_expr: Option<&ExtractExpr>,
19 timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21 resolve_command_with_limit(
22 command,
23 format,
24 extract_expr,
25 timeout,
26 MAX_SOURCE_RESPONSE_BYTES,
27 )
28 .await
29}
30
31pub async fn resolve_command_with_limit(
33 command: &[String],
34 format: DataFormat,
35 extract_expr: Option<&ExtractExpr>,
36 timeout: Option<Duration>,
37 max_stdout_bytes: usize,
38) -> Result<ResolvedValue, SourceError> {
39 if command.is_empty() {
40 return Err(SourceError {
41 source_id: String::new(),
42 kind: SourceErrorKind::Fetch("command is empty".into()),
43 });
44 }
45
46 let mut child = tokio::process::Command::new(&command[0])
47 .args(&command[1..])
48 .stdout(std::process::Stdio::piped())
49 .stderr(std::process::Stdio::piped())
50 .spawn()
51 .map_err(|e| SourceError {
52 source_id: String::new(),
53 kind: SourceErrorKind::Fetch(format!("failed to spawn '{}': {e}", command[0])),
54 })?;
55
56 let deadline = timeout.unwrap_or(DEFAULT_COMMAND_TIMEOUT);
57
58 let result = tokio::time::timeout(deadline, async {
59 let mut stdout_buf = Vec::new();
60 let mut stderr_buf = Vec::new();
61
62 if let Some(mut stdout) = child.stdout.take() {
63 let mut tmp = vec![0u8; 8192];
64 loop {
65 let n = stdout.read(&mut tmp).await.map_err(|e| SourceError {
66 source_id: String::new(),
67 kind: SourceErrorKind::Fetch(format!("failed to read stdout: {e}")),
68 })?;
69 if n == 0 {
70 break;
71 }
72 if stdout_buf.len() + n > max_stdout_bytes {
73 let _ = child.kill().await;
74 return Err(SourceError {
75 source_id: String::new(),
76 kind: SourceErrorKind::ResourceLimit(format!(
77 "command stdout exceeds {} byte limit",
78 max_stdout_bytes
79 )),
80 });
81 }
82 stdout_buf.extend_from_slice(&tmp[..n]);
83 }
84 }
85
86 if let Some(mut stderr) = child.stderr.take() {
87 let cap = 64 * 1024; let mut tmp = vec![0u8; 4096];
89 loop {
90 let n = stderr.read(&mut tmp).await.unwrap_or(0);
91 if n == 0 {
92 break;
93 }
94 if stderr_buf.len() + n > cap {
95 break;
96 }
97 stderr_buf.extend_from_slice(&tmp[..n]);
98 }
99 }
100
101 let status = child.wait().await.map_err(|e| SourceError {
102 source_id: String::new(),
103 kind: SourceErrorKind::Fetch(format!("command execution failed: {e}")),
104 })?;
105
106 Ok((status, stdout_buf, stderr_buf))
107 })
108 .await;
109
110 let (status, stdout_bytes, stderr_bytes) = match result {
111 Ok(inner) => inner?,
112 Err(_) => {
113 let _ = child.kill().await;
114 return Err(SourceError {
115 source_id: String::new(),
116 kind: SourceErrorKind::Timeout,
117 });
118 }
119 };
120
121 if !status.success() {
122 let stderr = String::from_utf8_lossy(&stderr_bytes);
123 return Err(SourceError {
124 source_id: String::new(),
125 kind: SourceErrorKind::Fetch(format!(
126 "command exited with {}: {}",
127 status,
128 stderr.trim()
129 )),
130 });
131 }
132
133 let stdout = String::from_utf8(stdout_bytes).map_err(|e| SourceError {
134 source_id: String::new(),
135 kind: SourceErrorKind::Parse(format!("command output is not valid UTF-8: {e}")),
136 })?;
137
138 let parsed = parse_data(&stdout, format)?;
139
140 let data = if let Some(expr) = extract_expr {
141 apply_extract(&parsed, expr)?
142 } else {
143 parsed
144 };
145
146 Ok(ResolvedValue {
147 data,
148 resolved_at: Instant::now(),
149 from_cache: false,
150 })
151}