plexus_substrate/activations/bash/executor/
mod.rs1use super::types::{BashOutput, ExecutorError};
2use async_stream::stream;
3use futures::Stream;
4use std::pin::Pin;
5use std::sync::Arc;
6use tokio::io::{AsyncBufReadExt, BufReader};
7use tokio::process::Command;
8use tokio::sync::Mutex;
9
10#[derive(Clone)]
12pub struct BashExecutor;
13
14impl BashExecutor {
15 pub fn new() -> Self {
16 Self
17 }
18
19 pub async fn execute(
24 &self,
25 command: &str,
26 ) -> Pin<Box<dyn Stream<Item = BashOutput> + Send + 'static>> {
27 let command = command.to_string();
28
29 Box::pin(stream! {
30 let mut child = match Command::new("bash")
32 .arg("-c")
33 .arg(&command)
34 .stdout(std::process::Stdio::piped())
35 .stderr(std::process::Stdio::piped())
36 .spawn()
37 {
38 Ok(child) => child,
39 Err(e) => {
40 let err = ExecutorError::SpawnFailed {
41 command: command.clone(),
42 source: e,
43 };
44 tracing::error!(error = %err, "Bash executor error");
45 yield BashOutput::Error { message: err.to_string() };
46 return;
47 }
48 };
49
50 let stdout = match child.stdout.take() {
52 Some(s) => s,
53 None => {
54 let err = ExecutorError::StdioCaptureFailed {
55 stream: "stdout",
56 command: command.clone(),
57 };
58 tracing::error!(error = %err, "Bash executor error");
59 yield BashOutput::Error { message: err.to_string() };
60 return;
61 }
62 };
63 let stderr = match child.stderr.take() {
64 Some(s) => s,
65 None => {
66 let err = ExecutorError::StdioCaptureFailed {
67 stream: "stderr",
68 command: command.clone(),
69 };
70 tracing::error!(error = %err, "Bash executor error");
71 yield BashOutput::Error { message: err.to_string() };
72 return;
73 }
74 };
75
76 let stderr_buffer: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
78 let stderr_buf = stderr_buffer.clone();
79 let stderr_task = tokio::spawn(async move {
80 let mut stderr_reader = BufReader::new(stderr).lines();
81 while let Ok(Some(line)) = stderr_reader.next_line().await {
82 let mut buf = stderr_buf.lock().await;
83 if buf.len() < 100 {
84 buf.push(line);
85 }
86 }
87 });
88
89 let mut stdout_reader = BufReader::new(stdout).lines();
91 while let Ok(Some(line)) = stdout_reader.next_line().await {
92 yield BashOutput::Stdout { line };
93 }
94
95 let _ = stderr_task.await;
97
98 let stderr_lines = stderr_buffer.lock().await;
100 if !stderr_lines.is_empty() {
101 tracing::debug!(
102 stderr_line_count = stderr_lines.len(),
103 command = %command,
104 "Bash process produced stderr output"
105 );
106 for line in stderr_lines.iter() {
107 yield BashOutput::Stderr { line: line.clone() };
108 }
109 }
110 drop(stderr_lines);
111
112 match child.wait().await {
114 Ok(status) => {
115 let code = status.code().unwrap_or(-1);
116 tracing::debug!(exit_code = code, command = %command, "Bash process exited");
117 yield BashOutput::Exit { code };
118 }
119 Err(e) => {
120 let err = ExecutorError::WaitFailed {
121 command: command.clone(),
122 source: e,
123 };
124 tracing::error!(error = %err, "Bash executor error");
125 yield BashOutput::Error { message: err.to_string() };
126 yield BashOutput::Exit { code: -1 };
127 }
128 }
129 })
130 }
131
132 pub async fn execute_collect(&self, command: &str) -> Vec<BashOutput> {
134 use futures::StreamExt;
135
136 let mut results = Vec::new();
137 let mut stream = self.execute(command).await;
138
139 while let Some(output) = stream.next().await {
140 results.push(output);
141 }
142
143 results
144 }
145}
146
147impl Default for BashExecutor {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[tokio::test]
158 async fn test_execute_simple_command() {
159 let executor = BashExecutor::new();
160 let outputs = executor.execute_collect("echo 'hello world'").await;
161
162 assert!(outputs.len() >= 2);
164
165 match &outputs[0] {
167 BashOutput::Stdout { line } => assert_eq!(line, "hello world"),
168 _ => panic!("Expected stdout"),
169 }
170
171 match outputs.last().unwrap() {
173 BashOutput::Exit { code } => assert_eq!(*code, 0),
174 _ => panic!("Expected exit"),
175 }
176 }
177
178 #[tokio::test]
179 async fn test_execute_stderr() {
180 let executor = BashExecutor::new();
181 let outputs = executor
182 .execute_collect("echo 'error' >&2")
183 .await;
184
185 assert!(outputs.len() >= 2);
187
188 let has_stderr = outputs.iter().any(|o| matches!(o, BashOutput::Stderr { .. }));
190 assert!(has_stderr);
191 }
192
193 #[tokio::test]
194 async fn test_execute_exit_code() {
195 let executor = BashExecutor::new();
196 let outputs = executor.execute_collect("exit 42").await;
197
198 match outputs.last().unwrap() {
200 BashOutput::Exit { code } => assert_eq!(*code, 42),
201 _ => panic!("Expected exit"),
202 }
203 }
204}