rsigma_runtime/sources/
command.rs1use std::time::{Duration, Instant};
4
5use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
6use tokio::io::AsyncReadExt;
7
8use super::extract::apply_extract;
9use super::file::parse_data;
10use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
11
12const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);
13
14pub async fn resolve_command(
16 command: &[String],
17 format: DataFormat,
18 extract_expr: Option<&ExtractExpr>,
19 timeout: Option<Duration>,
20) -> Result<ResolvedValue, SourceError> {
21 resolve_command_with_limit(
22 command,
23 format,
24 extract_expr,
25 timeout,
26 MAX_SOURCE_RESPONSE_BYTES,
27 )
28 .await
29}
30
31pub async fn resolve_command_with_limit(
33 command: &[String],
34 format: DataFormat,
35 extract_expr: Option<&ExtractExpr>,
36 timeout: Option<Duration>,
37 max_stdout_bytes: usize,
38) -> Result<ResolvedValue, SourceError> {
39 if command.is_empty() {
40 return Err(SourceError {
41 source_id: String::new(),
42 kind: SourceErrorKind::Fetch("command is empty".into()),
43 });
44 }
45
46 let mut child = tokio::process::Command::new(&command[0])
47 .args(&command[1..])
48 .stdout(std::process::Stdio::piped())
49 .stderr(std::process::Stdio::piped())
50 .spawn()
51 .map_err(|e| SourceError {
52 source_id: String::new(),
53 kind: SourceErrorKind::Fetch(format!("failed to spawn '{}': {e}", command[0])),
54 })?;
55
56 let deadline = timeout.unwrap_or(DEFAULT_COMMAND_TIMEOUT);
57
58 let result = tokio::time::timeout(deadline, async {
59 let mut stdout_buf = Vec::new();
60 let mut stderr_buf = Vec::new();
61
62 if let Some(mut stdout) = child.stdout.take() {
63 let mut tmp = vec![0u8; 8192];
64 loop {
65 let n = stdout.read(&mut tmp).await.map_err(|e| SourceError {
66 source_id: String::new(),
67 kind: SourceErrorKind::Fetch(format!("failed to read stdout: {e}")),
68 })?;
69 if n == 0 {
70 break;
71 }
72 if stdout_buf.len() + n > max_stdout_bytes {
73 let _ = child.kill().await;
74 return Err(SourceError {
75 source_id: String::new(),
76 kind: SourceErrorKind::ResourceLimit(format!(
77 "command stdout exceeds {max_stdout_bytes} byte limit"
78 )),
79 });
80 }
81 stdout_buf.extend_from_slice(&tmp[..n]);
82 }
83 }
84
85 if let Some(mut stderr) = child.stderr.take() {
86 let cap = 64 * 1024; let mut tmp = vec![0u8; 4096];
88 loop {
89 let n = stderr.read(&mut tmp).await.unwrap_or(0);
90 if n == 0 {
91 break;
92 }
93 if stderr_buf.len() + n > cap {
94 break;
95 }
96 stderr_buf.extend_from_slice(&tmp[..n]);
97 }
98 }
99
100 let status = child.wait().await.map_err(|e| SourceError {
101 source_id: String::new(),
102 kind: SourceErrorKind::Fetch(format!("command execution failed: {e}")),
103 })?;
104
105 Ok((status, stdout_buf, stderr_buf))
106 })
107 .await;
108
109 let (status, stdout_bytes, stderr_bytes) = match result {
110 Ok(inner) => inner?,
111 Err(_) => {
112 let _ = child.kill().await;
113 return Err(SourceError {
114 source_id: String::new(),
115 kind: SourceErrorKind::Timeout,
116 });
117 }
118 };
119
120 if !status.success() {
121 let stderr = String::from_utf8_lossy(&stderr_bytes);
122 return Err(SourceError {
123 source_id: String::new(),
124 kind: SourceErrorKind::Fetch(format!(
125 "command exited with {}: {}",
126 status,
127 stderr.trim()
128 )),
129 });
130 }
131
132 let stdout = String::from_utf8(stdout_bytes).map_err(|e| SourceError {
133 source_id: String::new(),
134 kind: SourceErrorKind::Parse(format!("command output is not valid UTF-8: {e}")),
135 })?;
136
137 let parsed = parse_data(&stdout, format)?;
138
139 let data = if let Some(expr) = extract_expr {
140 apply_extract(&parsed, expr)?
141 } else {
142 parsed
143 };
144
145 Ok(ResolvedValue {
146 data,
147 resolved_at: Instant::now(),
148 from_cache: false,
149 })
150}