Skip to main content

plexus_substrate/activations/bash/executor/
mod.rs

1use super::types::BashOutput;
2use async_stream::stream;
3use futures::Stream;
4use std::pin::Pin;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8/// Core bash executor - can be used programmatically without RPC
9#[derive(Clone)]
10pub struct BashExecutor;
11
12impl BashExecutor {
13    pub fn new() -> Self {
14        Self
15    }
16
17    /// Execute a bash command and stream the output
18    ///
19    /// This is the core business logic - completely independent of RPC.
20    /// Returns a stream of BashOutput items.
21    pub async fn execute(
22        &self,
23        command: &str,
24    ) -> Pin<Box<dyn Stream<Item = BashOutput> + Send + 'static>> {
25        let command = command.to_string();
26
27        Box::pin(stream! {
28            // Spawn the bash process
29            let mut child = match Command::new("bash")
30                .arg("-c")
31                .arg(&command)
32                .stdout(std::process::Stdio::piped())
33                .stderr(std::process::Stdio::piped())
34                .spawn()
35            {
36                Ok(child) => child,
37                Err(e) => {
38                    // If we can't spawn, yield an error through BashOutput
39                    // Note: We can't yield BashError here since the stream type is BashOutput
40                    // In a real system, we might want to make this an enum or handle differently
41                    eprintln!("Failed to spawn bash: {}", e);
42                    return;
43                }
44            };
45
46            // Get stdout and stderr handles
47            let stdout = child.stdout.take().expect("Failed to capture stdout");
48            let stderr = child.stderr.take().expect("Failed to capture stderr");
49
50            // Create buffered readers
51            let mut stdout_reader = BufReader::new(stdout).lines();
52            let mut stderr_reader = BufReader::new(stderr).lines();
53
54            // Read lines from both stdout and stderr
55            // Note: This is a simple implementation that reads stdout first, then stderr
56            // A more sophisticated version would interleave them as they arrive
57            while let Ok(Some(line)) = stdout_reader.next_line().await {
58                yield BashOutput::Stdout { line };
59            }
60
61            while let Ok(Some(line)) = stderr_reader.next_line().await {
62                yield BashOutput::Stderr { line };
63            }
64
65            // Wait for process to complete and get exit code
66            match child.wait().await {
67                Ok(status) => {
68                    let code = status.code().unwrap_or(-1);
69                    yield BashOutput::Exit { code };
70                }
71                Err(e) => {
72                    eprintln!("Failed to wait for child: {}", e);
73                    yield BashOutput::Exit { code: -1 };
74                }
75            }
76        })
77    }
78
79    /// Execute a command and collect all output (convenience method for testing)
80    pub async fn execute_collect(&self, command: &str) -> Vec<BashOutput> {
81        use futures::StreamExt;
82
83        let mut results = Vec::new();
84        let mut stream = self.execute(command).await;
85
86        while let Some(output) = stream.next().await {
87            results.push(output);
88        }
89
90        results
91    }
92}
93
94impl Default for BashExecutor {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[tokio::test]
105    async fn test_execute_simple_command() {
106        let executor = BashExecutor::new();
107        let outputs = executor.execute_collect("echo 'hello world'").await;
108
109        // Should have stdout line + exit code
110        assert!(outputs.len() >= 2);
111
112        // Check for stdout
113        match &outputs[0] {
114            BashOutput::Stdout { line } => assert_eq!(line, "hello world"),
115            _ => panic!("Expected stdout"),
116        }
117
118        // Check for successful exit
119        match outputs.last().unwrap() {
120            BashOutput::Exit { code } => assert_eq!(*code, 0),
121            _ => panic!("Expected exit"),
122        }
123    }
124
125    #[tokio::test]
126    async fn test_execute_stderr() {
127        let executor = BashExecutor::new();
128        let outputs = executor
129            .execute_collect("echo 'error' >&2")
130            .await;
131
132        // Should have stderr line + exit code
133        assert!(outputs.len() >= 2);
134
135        // Check for stderr
136        let has_stderr = outputs.iter().any(|o| matches!(o, BashOutput::Stderr { .. }));
137        assert!(has_stderr);
138    }
139
140    #[tokio::test]
141    async fn test_execute_exit_code() {
142        let executor = BashExecutor::new();
143        let outputs = executor.execute_collect("exit 42").await;
144
145        // Check for exit code 42
146        match outputs.last().unwrap() {
147            BashOutput::Exit { code } => assert_eq!(*code, 42),
148            _ => panic!("Expected exit"),
149        }
150    }
151}