Skip to main content

strike48_connector/
process.rs

1//! Async process execution utilities for connector implementations.
2//!
3//! Provides non-blocking wrappers around external command execution using
4//! `tokio::process::Command`. These helpers ensure that long-running commands
5//! (nmap, recon tools, scripts) never block tokio worker threads, keeping
6//! heartbeats and metrics flowing even during multi-minute operations.
7//!
8//! # Why This Matters
9//!
10//! Using `std::process::Command` inside an async `execute()` blocks the tokio
11//! worker thread for the entire command duration. With limited worker threads
12//! (typically `num_cpus`), a few concurrent commands can starve the runtime,
13//! causing heartbeat failures and connection drops.
14//!
15//! `tokio::process::Command` waits for child processes via async I/O
16//! (kqueue/epoll), so the tokio task yields while waiting. Thousands of
17//! concurrent commands use zero extra OS threads.
18//!
19//! # Examples
20//!
21//! ```rust,ignore
22//! use strike48_connector::process::{run_command, run_command_with_timeout};
23//! use std::time::Duration;
24//!
25//! // Simple command
26//! let output = run_command("echo", &["hello", "world"]).await?;
27//! println!("stdout: {}", output.stdout);
28//!
29//! // With timeout (recommended for untrusted input)
30//! let output = run_command_with_timeout(
31//!     "nmap", &["-sV", "target.com"],
32//!     Duration::from_secs(300),
33//! ).await?;
34//!
35//! // With environment variables and working directory
36//! let output = run_command_opts("python3", &["script.py"], CommandOptions {
37//!     timeout: Some(Duration::from_secs(60)),
38//!     working_dir: Some("/tmp/workdir".into()),
39//!     env: vec![("API_KEY".into(), "secret".into())],
40//!     ..Default::default()
41//! }).await?;
42//! ```
43
44use crate::error::{ConnectorError, Result};
45use std::collections::HashMap;
46use std::path::PathBuf;
47use std::time::Duration;
48use tokio::process::Command;
49
50/// Output from a completed command.
51#[derive(Debug, Clone)]
52pub struct CommandOutput {
53    /// Standard output as a UTF-8 string (lossy conversion).
54    pub stdout: String,
55    /// Standard error as a UTF-8 string (lossy conversion).
56    pub stderr: String,
57    /// Process exit code. `None` if the process was killed by a signal.
58    pub exit_code: Option<i32>,
59    /// Whether the command exited successfully (exit code 0).
60    pub success: bool,
61}
62
63/// Options for customizing command execution.
64#[derive(Debug, Clone, Default)]
65pub struct CommandOptions {
66    /// Maximum time the command is allowed to run.
67    /// The child process is killed if it exceeds this duration.
68    pub timeout: Option<Duration>,
69    /// Working directory for the command.
70    pub working_dir: Option<PathBuf>,
71    /// Additional environment variables (merged with inherited env).
72    pub env: Vec<(String, String)>,
73    /// If true, don't inherit the parent process's environment.
74    pub clear_env: bool,
75    /// Optional stdin data to pipe to the command.
76    pub stdin_data: Option<Vec<u8>>,
77}
78
79/// Run an external command asynchronously and capture its output.
80///
81/// Uses `tokio::process::Command` internally, so the calling tokio task
82/// yields while the child process runs. No OS threads are blocked.
83///
84/// # Errors
85///
86/// Returns `ConnectorError::Other` if the command fails to start
87/// (e.g., binary not found).
88pub async fn run_command(program: &str, args: &[&str]) -> Result<CommandOutput> {
89    run_command_opts(program, args, CommandOptions::default()).await
90}
91
92/// Run an external command with a timeout.
93///
94/// If the command doesn't complete within `timeout`, the child process
95/// is killed and a `ConnectorError::Timeout` is returned.
96pub async fn run_command_with_timeout(
97    program: &str,
98    args: &[&str],
99    timeout: Duration,
100) -> Result<CommandOutput> {
101    run_command_opts(
102        program,
103        args,
104        CommandOptions {
105            timeout: Some(timeout),
106            ..Default::default()
107        },
108    )
109    .await
110}
111
112/// Run an external command with full customization.
113pub async fn run_command_opts(
114    program: &str,
115    args: &[&str],
116    options: CommandOptions,
117) -> Result<CommandOutput> {
118    let mut cmd = Command::new(program);
119    cmd.args(args);
120
121    if options.clear_env {
122        cmd.env_clear();
123    }
124
125    for (key, value) in &options.env {
126        cmd.env(key, value);
127    }
128
129    if let Some(ref dir) = options.working_dir {
130        cmd.current_dir(dir);
131    }
132
133    if options.stdin_data.is_some() {
134        cmd.stdin(std::process::Stdio::piped());
135    }
136
137    cmd.stdout(std::process::Stdio::piped());
138    cmd.stderr(std::process::Stdio::piped());
139
140    let mut child = cmd
141        .spawn()
142        .map_err(|e| ConnectorError::Other(format!("Failed to spawn '{program}': {e}")))?;
143
144    if let Some(data) = options.stdin_data {
145        use std::io::ErrorKind;
146        use tokio::io::AsyncWriteExt;
147        if let Some(mut stdin) = child.stdin.take() {
148            if let Err(e) = stdin.write_all(&data).await
149                && e.kind() != ErrorKind::BrokenPipe
150            {
151                return Err(ConnectorError::Other(format!(
152                    "Failed to write stdin to '{program}': {e}"
153                )));
154            }
155            drop(stdin);
156        }
157    }
158
159    let output_future = child.wait_with_output();
160
161    let output = if let Some(timeout) = options.timeout {
162        match tokio::time::timeout(timeout, output_future).await {
163            Ok(result) => result.map_err(|e| {
164                ConnectorError::Other(format!("Command '{program}' I/O error: {e}"))
165            })?,
166            Err(_) => {
167                return Err(ConnectorError::Timeout(format!(
168                    "Command '{program}' timed out after {timeout:?}"
169                )));
170            }
171        }
172    } else {
173        output_future
174            .await
175            .map_err(|e| ConnectorError::Other(format!("Command '{program}' I/O error: {e}")))?
176    };
177
178    Ok(CommandOutput {
179        stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
180        stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
181        exit_code: output.status.code(),
182        success: output.status.success(),
183    })
184}
185
186/// Run a shell command via the system shell (`sh -c` on Unix).
187///
188/// Useful for commands that need shell features like pipes, redirection,
189/// or glob expansion.
190///
191/// # Security Warning
192///
193/// Never pass unsanitized user input directly. Use [`run_command`] with
194/// explicit arguments instead when possible.
195pub async fn run_shell(command: &str) -> Result<CommandOutput> {
196    run_command("sh", &["-c", command]).await
197}
198
199/// Run a shell command with a timeout.
200pub async fn run_shell_with_timeout(command: &str, timeout: Duration) -> Result<CommandOutput> {
201    run_command_with_timeout("sh", &["-c", command], timeout).await
202}
203
204/// Convenience function to run a command and return stdout, failing on non-zero exit.
205///
206/// Returns the trimmed stdout string on success.
207pub async fn run_command_stdout(program: &str, args: &[&str]) -> Result<String> {
208    let output = run_command(program, args).await?;
209    if !output.success {
210        return Err(ConnectorError::Other(format!(
211            "Command '{program}' failed (exit {}): {}",
212            output.exit_code.unwrap_or(-1),
213            output.stderr.trim()
214        )));
215    }
216    Ok(output.stdout.trim().to_string())
217}
218
219/// Build a command with a fluent API for more complex configurations.
220///
221/// # Examples
222///
223/// ```rust,ignore
224/// use strike48_connector::process::CommandBuilder;
225/// use std::time::Duration;
226///
227/// let output = CommandBuilder::new("nmap")
228///     .args(&["-sV", "-p", "1-1000", "target.com"])
229///     .timeout(Duration::from_secs(300))
230///     .working_dir("/tmp")
231///     .env("NMAP_PRIVILEGED", "1")
232///     .run()
233///     .await?;
234/// ```
235pub struct CommandBuilder {
236    program: String,
237    args: Vec<String>,
238    options: CommandOptions,
239}
240
241impl CommandBuilder {
242    pub fn new(program: impl Into<String>) -> Self {
243        Self {
244            program: program.into(),
245            args: Vec::new(),
246            options: CommandOptions::default(),
247        }
248    }
249
250    pub fn arg(mut self, arg: impl Into<String>) -> Self {
251        self.args.push(arg.into());
252        self
253    }
254
255    pub fn args(mut self, args: &[&str]) -> Self {
256        self.args.extend(args.iter().map(|s| s.to_string()));
257        self
258    }
259
260    pub fn timeout(mut self, timeout: Duration) -> Self {
261        self.options.timeout = Some(timeout);
262        self
263    }
264
265    pub fn working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
266        self.options.working_dir = Some(dir.into());
267        self
268    }
269
270    pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
271        self.options.env.push((key.into(), value.into()));
272        self
273    }
274
275    pub fn envs(mut self, vars: HashMap<String, String>) -> Self {
276        self.options.env.extend(vars);
277        self
278    }
279
280    pub fn clear_env(mut self) -> Self {
281        self.options.clear_env = true;
282        self
283    }
284
285    pub fn stdin(mut self, data: impl Into<Vec<u8>>) -> Self {
286        self.options.stdin_data = Some(data.into());
287        self
288    }
289
290    /// Run the command and return the output.
291    pub async fn run(self) -> Result<CommandOutput> {
292        let args: Vec<&str> = self.args.iter().map(|s| s.as_str()).collect();
293        run_command_opts(&self.program, &args, self.options).await
294    }
295
296    /// Run the command and return trimmed stdout, failing on non-zero exit.
297    pub async fn run_stdout(self) -> Result<String> {
298        let program = self.program.clone();
299        let output = self.run().await?;
300        if !output.success {
301            return Err(ConnectorError::Other(format!(
302                "Command '{program}' failed (exit {}): {}",
303                output.exit_code.unwrap_or(-1),
304                output.stderr.trim()
305            )));
306        }
307        Ok(output.stdout.trim().to_string())
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[tokio::test]
316    async fn test_run_command_echo() {
317        let output = run_command("echo", &["hello", "world"]).await.unwrap();
318        assert!(output.success);
319        assert_eq!(output.stdout.trim(), "hello world");
320        assert_eq!(output.exit_code, Some(0));
321    }
322
323    #[tokio::test]
324    async fn test_run_command_nonexistent() {
325        let result = run_command("nonexistent_binary_xyz", &[]).await;
326        assert!(result.is_err());
327    }
328
329    #[tokio::test]
330    async fn test_run_command_exit_code() {
331        let output = run_command("sh", &["-c", "exit 42"]).await.unwrap();
332        assert!(!output.success);
333        assert_eq!(output.exit_code, Some(42));
334    }
335
336    #[tokio::test]
337    async fn test_run_command_stderr() {
338        let output = run_command("sh", &["-c", "echo error >&2"]).await.unwrap();
339        assert!(output.success);
340        assert_eq!(output.stderr.trim(), "error");
341    }
342
343    #[tokio::test]
344    async fn test_run_command_with_timeout_success() {
345        let output = run_command_with_timeout("echo", &["fast"], Duration::from_secs(5))
346            .await
347            .unwrap();
348        assert!(output.success);
349        assert_eq!(output.stdout.trim(), "fast");
350    }
351
352    #[tokio::test]
353    async fn test_run_command_with_timeout_exceeded() {
354        let result = run_command_with_timeout("sleep", &["10"], Duration::from_millis(100)).await;
355        assert!(result.is_err());
356        let err = result.unwrap_err();
357        assert!(err.to_string().contains("timed out"));
358    }
359
360    #[tokio::test]
361    async fn test_run_shell() {
362        let output = run_shell("echo $((2 + 3))").await.unwrap();
363        assert!(output.success);
364        assert_eq!(output.stdout.trim(), "5");
365    }
366
367    #[tokio::test]
368    async fn test_run_command_stdout_helper() {
369        let stdout = run_command_stdout("echo", &["hello"]).await.unwrap();
370        assert_eq!(stdout, "hello");
371    }
372
373    #[tokio::test]
374    async fn test_run_command_stdout_fails_on_error() {
375        let result = run_command_stdout("sh", &["-c", "echo fail >&2; exit 1"]).await;
376        assert!(result.is_err());
377    }
378
379    #[tokio::test]
380    async fn test_command_builder() {
381        let output = CommandBuilder::new("echo")
382            .args(&["hello", "builder"])
383            .timeout(Duration::from_secs(5))
384            .run()
385            .await
386            .unwrap();
387        assert!(output.success);
388        assert_eq!(output.stdout.trim(), "hello builder");
389    }
390
391    #[tokio::test]
392    async fn test_command_builder_with_env() {
393        let output = CommandBuilder::new("sh")
394            .args(&["-c", "echo $MY_VAR"])
395            .env("MY_VAR", "test_value")
396            .run()
397            .await
398            .unwrap();
399        assert!(output.success);
400        assert_eq!(output.stdout.trim(), "test_value");
401    }
402
403    #[tokio::test]
404    async fn test_command_with_stdin() {
405        let output = run_command_opts(
406            "cat",
407            &[],
408            CommandOptions {
409                stdin_data: Some(b"piped input".to_vec()),
410                ..Default::default()
411            },
412        )
413        .await
414        .unwrap();
415        assert!(output.success);
416        assert_eq!(output.stdout, "piped input");
417    }
418
419    #[tokio::test]
420    async fn test_stdin_broken_pipe_tolerated() {
421        let output = run_command_opts(
422            "head",
423            &["-c", "1"],
424            CommandOptions {
425                stdin_data: Some(b"lots of data that head will not fully read".to_vec()),
426                ..Default::default()
427            },
428        )
429        .await
430        .unwrap();
431        assert!(output.success);
432    }
433
434    #[tokio::test]
435    async fn test_concurrent_commands_no_thread_starvation() {
436        use std::time::Instant;
437
438        let start = Instant::now();
439        let mut handles = Vec::new();
440
441        for i in 0..20 {
442            handles.push(tokio::spawn(async move {
443                run_command_with_timeout("sleep", &["1"], Duration::from_secs(5))
444                    .await
445                    .unwrap();
446                i
447            }));
448        }
449
450        let mut results = Vec::new();
451        for handle in handles {
452            results.push(handle.await.unwrap());
453        }
454
455        let elapsed = start.elapsed();
456
457        assert_eq!(results.len(), 20);
458        // 20 concurrent `sleep 1` should complete in ~1-2 seconds, not 20 seconds
459        assert!(
460            elapsed < Duration::from_secs(5),
461            "20 concurrent sleeps took {:?} — thread starvation detected",
462            elapsed
463        );
464    }
465}