1use crate::error::{ConnectorError, Result};
45use std::collections::HashMap;
46use std::path::PathBuf;
47use std::time::Duration;
48use tokio::process::Command;
49
50#[derive(Debug, Clone)]
52pub struct CommandOutput {
53 pub stdout: String,
55 pub stderr: String,
57 pub exit_code: Option<i32>,
59 pub success: bool,
61}
62
63#[derive(Debug, Clone, Default)]
65pub struct CommandOptions {
66 pub timeout: Option<Duration>,
69 pub working_dir: Option<PathBuf>,
71 pub env: Vec<(String, String)>,
73 pub clear_env: bool,
75 pub stdin_data: Option<Vec<u8>>,
77}
78
79pub async fn run_command(program: &str, args: &[&str]) -> Result<CommandOutput> {
89 run_command_opts(program, args, CommandOptions::default()).await
90}
91
92pub async fn run_command_with_timeout(
97 program: &str,
98 args: &[&str],
99 timeout: Duration,
100) -> Result<CommandOutput> {
101 run_command_opts(
102 program,
103 args,
104 CommandOptions {
105 timeout: Some(timeout),
106 ..Default::default()
107 },
108 )
109 .await
110}
111
112pub async fn run_command_opts(
114 program: &str,
115 args: &[&str],
116 options: CommandOptions,
117) -> Result<CommandOutput> {
118 let mut cmd = Command::new(program);
119 cmd.args(args);
120
121 if options.clear_env {
122 cmd.env_clear();
123 }
124
125 for (key, value) in &options.env {
126 cmd.env(key, value);
127 }
128
129 if let Some(ref dir) = options.working_dir {
130 cmd.current_dir(dir);
131 }
132
133 if options.stdin_data.is_some() {
134 cmd.stdin(std::process::Stdio::piped());
135 }
136
137 cmd.stdout(std::process::Stdio::piped());
138 cmd.stderr(std::process::Stdio::piped());
139
140 let mut child = cmd
141 .spawn()
142 .map_err(|e| ConnectorError::Other(format!("Failed to spawn '{program}': {e}")))?;
143
144 if let Some(data) = options.stdin_data {
145 use std::io::ErrorKind;
146 use tokio::io::AsyncWriteExt;
147 if let Some(mut stdin) = child.stdin.take() {
148 if let Err(e) = stdin.write_all(&data).await
149 && e.kind() != ErrorKind::BrokenPipe
150 {
151 return Err(ConnectorError::Other(format!(
152 "Failed to write stdin to '{program}': {e}"
153 )));
154 }
155 drop(stdin);
156 }
157 }
158
159 let output_future = child.wait_with_output();
160
161 let output = if let Some(timeout) = options.timeout {
162 match tokio::time::timeout(timeout, output_future).await {
163 Ok(result) => result.map_err(|e| {
164 ConnectorError::Other(format!("Command '{program}' I/O error: {e}"))
165 })?,
166 Err(_) => {
167 return Err(ConnectorError::Timeout(format!(
168 "Command '{program}' timed out after {timeout:?}"
169 )));
170 }
171 }
172 } else {
173 output_future
174 .await
175 .map_err(|e| ConnectorError::Other(format!("Command '{program}' I/O error: {e}")))?
176 };
177
178 Ok(CommandOutput {
179 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
180 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
181 exit_code: output.status.code(),
182 success: output.status.success(),
183 })
184}
185
186pub async fn run_shell(command: &str) -> Result<CommandOutput> {
196 run_command("sh", &["-c", command]).await
197}
198
199pub async fn run_shell_with_timeout(command: &str, timeout: Duration) -> Result<CommandOutput> {
201 run_command_with_timeout("sh", &["-c", command], timeout).await
202}
203
204pub async fn run_command_stdout(program: &str, args: &[&str]) -> Result<String> {
208 let output = run_command(program, args).await?;
209 if !output.success {
210 return Err(ConnectorError::Other(format!(
211 "Command '{program}' failed (exit {}): {}",
212 output.exit_code.unwrap_or(-1),
213 output.stderr.trim()
214 )));
215 }
216 Ok(output.stdout.trim().to_string())
217}
218
219pub struct CommandBuilder {
236 program: String,
237 args: Vec<String>,
238 options: CommandOptions,
239}
240
241impl CommandBuilder {
242 pub fn new(program: impl Into<String>) -> Self {
243 Self {
244 program: program.into(),
245 args: Vec::new(),
246 options: CommandOptions::default(),
247 }
248 }
249
250 pub fn arg(mut self, arg: impl Into<String>) -> Self {
251 self.args.push(arg.into());
252 self
253 }
254
255 pub fn args(mut self, args: &[&str]) -> Self {
256 self.args.extend(args.iter().map(|s| s.to_string()));
257 self
258 }
259
260 pub fn timeout(mut self, timeout: Duration) -> Self {
261 self.options.timeout = Some(timeout);
262 self
263 }
264
265 pub fn working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
266 self.options.working_dir = Some(dir.into());
267 self
268 }
269
270 pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
271 self.options.env.push((key.into(), value.into()));
272 self
273 }
274
275 pub fn envs(mut self, vars: HashMap<String, String>) -> Self {
276 self.options.env.extend(vars);
277 self
278 }
279
280 pub fn clear_env(mut self) -> Self {
281 self.options.clear_env = true;
282 self
283 }
284
285 pub fn stdin(mut self, data: impl Into<Vec<u8>>) -> Self {
286 self.options.stdin_data = Some(data.into());
287 self
288 }
289
290 pub async fn run(self) -> Result<CommandOutput> {
292 let args: Vec<&str> = self.args.iter().map(|s| s.as_str()).collect();
293 run_command_opts(&self.program, &args, self.options).await
294 }
295
296 pub async fn run_stdout(self) -> Result<String> {
298 let program = self.program.clone();
299 let output = self.run().await?;
300 if !output.success {
301 return Err(ConnectorError::Other(format!(
302 "Command '{program}' failed (exit {}): {}",
303 output.exit_code.unwrap_or(-1),
304 output.stderr.trim()
305 )));
306 }
307 Ok(output.stdout.trim().to_string())
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[tokio::test]
316 async fn test_run_command_echo() {
317 let output = run_command("echo", &["hello", "world"]).await.unwrap();
318 assert!(output.success);
319 assert_eq!(output.stdout.trim(), "hello world");
320 assert_eq!(output.exit_code, Some(0));
321 }
322
323 #[tokio::test]
324 async fn test_run_command_nonexistent() {
325 let result = run_command("nonexistent_binary_xyz", &[]).await;
326 assert!(result.is_err());
327 }
328
329 #[tokio::test]
330 async fn test_run_command_exit_code() {
331 let output = run_command("sh", &["-c", "exit 42"]).await.unwrap();
332 assert!(!output.success);
333 assert_eq!(output.exit_code, Some(42));
334 }
335
336 #[tokio::test]
337 async fn test_run_command_stderr() {
338 let output = run_command("sh", &["-c", "echo error >&2"]).await.unwrap();
339 assert!(output.success);
340 assert_eq!(output.stderr.trim(), "error");
341 }
342
343 #[tokio::test]
344 async fn test_run_command_with_timeout_success() {
345 let output = run_command_with_timeout("echo", &["fast"], Duration::from_secs(5))
346 .await
347 .unwrap();
348 assert!(output.success);
349 assert_eq!(output.stdout.trim(), "fast");
350 }
351
352 #[tokio::test]
353 async fn test_run_command_with_timeout_exceeded() {
354 let result = run_command_with_timeout("sleep", &["10"], Duration::from_millis(100)).await;
355 assert!(result.is_err());
356 let err = result.unwrap_err();
357 assert!(err.to_string().contains("timed out"));
358 }
359
360 #[tokio::test]
361 async fn test_run_shell() {
362 let output = run_shell("echo $((2 + 3))").await.unwrap();
363 assert!(output.success);
364 assert_eq!(output.stdout.trim(), "5");
365 }
366
367 #[tokio::test]
368 async fn test_run_command_stdout_helper() {
369 let stdout = run_command_stdout("echo", &["hello"]).await.unwrap();
370 assert_eq!(stdout, "hello");
371 }
372
373 #[tokio::test]
374 async fn test_run_command_stdout_fails_on_error() {
375 let result = run_command_stdout("sh", &["-c", "echo fail >&2; exit 1"]).await;
376 assert!(result.is_err());
377 }
378
379 #[tokio::test]
380 async fn test_command_builder() {
381 let output = CommandBuilder::new("echo")
382 .args(&["hello", "builder"])
383 .timeout(Duration::from_secs(5))
384 .run()
385 .await
386 .unwrap();
387 assert!(output.success);
388 assert_eq!(output.stdout.trim(), "hello builder");
389 }
390
391 #[tokio::test]
392 async fn test_command_builder_with_env() {
393 let output = CommandBuilder::new("sh")
394 .args(&["-c", "echo $MY_VAR"])
395 .env("MY_VAR", "test_value")
396 .run()
397 .await
398 .unwrap();
399 assert!(output.success);
400 assert_eq!(output.stdout.trim(), "test_value");
401 }
402
403 #[tokio::test]
404 async fn test_command_with_stdin() {
405 let output = run_command_opts(
406 "cat",
407 &[],
408 CommandOptions {
409 stdin_data: Some(b"piped input".to_vec()),
410 ..Default::default()
411 },
412 )
413 .await
414 .unwrap();
415 assert!(output.success);
416 assert_eq!(output.stdout, "piped input");
417 }
418
419 #[tokio::test]
420 async fn test_stdin_broken_pipe_tolerated() {
421 let output = run_command_opts(
422 "head",
423 &["-c", "1"],
424 CommandOptions {
425 stdin_data: Some(b"lots of data that head will not fully read".to_vec()),
426 ..Default::default()
427 },
428 )
429 .await
430 .unwrap();
431 assert!(output.success);
432 }
433
434 #[tokio::test]
435 async fn test_concurrent_commands_no_thread_starvation() {
436 use std::time::Instant;
437
438 let start = Instant::now();
439 let mut handles = Vec::new();
440
441 for i in 0..20 {
442 handles.push(tokio::spawn(async move {
443 run_command_with_timeout("sleep", &["1"], Duration::from_secs(5))
444 .await
445 .unwrap();
446 i
447 }));
448 }
449
450 let mut results = Vec::new();
451 for handle in handles {
452 results.push(handle.await.unwrap());
453 }
454
455 let elapsed = start.elapsed();
456
457 assert_eq!(results.len(), 20);
458 assert!(
460 elapsed < Duration::from_secs(5),
461 "20 concurrent sleeps took {:?} — thread starvation detected",
462 elapsed
463 );
464 }
465}