Skip to main content

plexus_substrate/activations/bash/executor/
mod.rs

1use super::types::{BashOutput, ExecutorError};
2use async_stream::stream;
3use futures::Stream;
4use std::pin::Pin;
5use std::sync::Arc;
6use tokio::io::{AsyncBufReadExt, BufReader};
7use tokio::process::Command;
8use tokio::sync::Mutex;
9
10/// Core bash executor - can be used programmatically without RPC
11#[derive(Clone)]
12pub struct BashExecutor;
13
14impl BashExecutor {
15    pub fn new() -> Self {
16        Self
17    }
18
19    /// Execute a bash command and stream the output
20    ///
21    /// This is the core business logic - completely independent of RPC.
22    /// Returns a stream of BashOutput items.
23    pub async fn execute(
24        &self,
25        command: &str,
26    ) -> Pin<Box<dyn Stream<Item = BashOutput> + Send + 'static>> {
27        let command = command.to_string();
28
29        Box::pin(stream! {
30            // Spawn the bash process
31            let mut child = match Command::new("bash")
32                .arg("-c")
33                .arg(&command)
34                .stdout(std::process::Stdio::piped())
35                .stderr(std::process::Stdio::piped())
36                .spawn()
37            {
38                Ok(child) => child,
39                Err(e) => {
40                    let err = ExecutorError::SpawnFailed {
41                        command: command.clone(),
42                        source: e,
43                    };
44                    tracing::error!(error = %err, "Bash executor error");
45                    yield BashOutput::Error { message: err.to_string() };
46                    return;
47                }
48            };
49
50            // Get stdout and stderr handles
51            let stdout = match child.stdout.take() {
52                Some(s) => s,
53                None => {
54                    let err = ExecutorError::StdioCaptureFailed {
55                        stream: "stdout",
56                        command: command.clone(),
57                    };
58                    tracing::error!(error = %err, "Bash executor error");
59                    yield BashOutput::Error { message: err.to_string() };
60                    return;
61                }
62            };
63            let stderr = match child.stderr.take() {
64                Some(s) => s,
65                None => {
66                    let err = ExecutorError::StdioCaptureFailed {
67                        stream: "stderr",
68                        command: command.clone(),
69                    };
70                    tracing::error!(error = %err, "Bash executor error");
71                    yield BashOutput::Error { message: err.to_string() };
72                    return;
73                }
74            };
75
76            // Capture stderr in background task to prevent pipe buffer blocking
77            let stderr_buffer: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
78            let stderr_buf = stderr_buffer.clone();
79            let stderr_task = tokio::spawn(async move {
80                let mut stderr_reader = BufReader::new(stderr).lines();
81                while let Ok(Some(line)) = stderr_reader.next_line().await {
82                    let mut buf = stderr_buf.lock().await;
83                    if buf.len() < 100 {
84                        buf.push(line);
85                    }
86                }
87            });
88
89            // Stream stdout lines
90            let mut stdout_reader = BufReader::new(stdout).lines();
91            while let Ok(Some(line)) = stdout_reader.next_line().await {
92                yield BashOutput::Stdout { line };
93            }
94
95            // Wait for stderr task to finish before reading the buffer
96            let _ = stderr_task.await;
97
98            // Yield captured stderr lines
99            let stderr_lines = stderr_buffer.lock().await;
100            if !stderr_lines.is_empty() {
101                tracing::debug!(
102                    stderr_line_count = stderr_lines.len(),
103                    command = %command,
104                    "Bash process produced stderr output"
105                );
106                for line in stderr_lines.iter() {
107                    yield BashOutput::Stderr { line: line.clone() };
108                }
109            }
110            drop(stderr_lines);
111
112            // Wait for process to complete and get exit code
113            match child.wait().await {
114                Ok(status) => {
115                    let code = status.code().unwrap_or(-1);
116                    tracing::debug!(exit_code = code, command = %command, "Bash process exited");
117                    yield BashOutput::Exit { code };
118                }
119                Err(e) => {
120                    let err = ExecutorError::WaitFailed {
121                        command: command.clone(),
122                        source: e,
123                    };
124                    tracing::error!(error = %err, "Bash executor error");
125                    yield BashOutput::Error { message: err.to_string() };
126                    yield BashOutput::Exit { code: -1 };
127                }
128            }
129        })
130    }
131
132    /// Execute a command and collect all output (convenience method for testing)
133    pub async fn execute_collect(&self, command: &str) -> Vec<BashOutput> {
134        use futures::StreamExt;
135
136        let mut results = Vec::new();
137        let mut stream = self.execute(command).await;
138
139        while let Some(output) = stream.next().await {
140            results.push(output);
141        }
142
143        results
144    }
145}
146
147impl Default for BashExecutor {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[tokio::test]
158    async fn test_execute_simple_command() {
159        let executor = BashExecutor::new();
160        let outputs = executor.execute_collect("echo 'hello world'").await;
161
162        // Should have stdout line + exit code
163        assert!(outputs.len() >= 2);
164
165        // Check for stdout
166        match &outputs[0] {
167            BashOutput::Stdout { line } => assert_eq!(line, "hello world"),
168            _ => panic!("Expected stdout"),
169        }
170
171        // Check for successful exit
172        match outputs.last().unwrap() {
173            BashOutput::Exit { code } => assert_eq!(*code, 0),
174            _ => panic!("Expected exit"),
175        }
176    }
177
178    #[tokio::test]
179    async fn test_execute_stderr() {
180        let executor = BashExecutor::new();
181        let outputs = executor
182            .execute_collect("echo 'error' >&2")
183            .await;
184
185        // Should have stderr line + exit code
186        assert!(outputs.len() >= 2);
187
188        // Check for stderr
189        let has_stderr = outputs.iter().any(|o| matches!(o, BashOutput::Stderr { .. }));
190        assert!(has_stderr);
191    }
192
193    #[tokio::test]
194    async fn test_execute_exit_code() {
195        let executor = BashExecutor::new();
196        let outputs = executor.execute_collect("exit 42").await;
197
198        // Check for exit code 42
199        match outputs.last().unwrap() {
200            BashOutput::Exit { code } => assert_eq!(*code, 42),
201            _ => panic!("Expected exit"),
202        }
203    }
204}