plexus_substrate/activations/bash/executor/
mod.rs1use super::types::BashOutput;
2use async_stream::stream;
3use futures::Stream;
4use std::pin::Pin;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8#[derive(Clone)]
10pub struct BashExecutor;
11
12impl BashExecutor {
13 pub fn new() -> Self {
14 Self
15 }
16
17 pub async fn execute(
22 &self,
23 command: &str,
24 ) -> Pin<Box<dyn Stream<Item = BashOutput> + Send + 'static>> {
25 let command = command.to_string();
26
27 Box::pin(stream! {
28 let mut child = match Command::new("bash")
30 .arg("-c")
31 .arg(&command)
32 .stdout(std::process::Stdio::piped())
33 .stderr(std::process::Stdio::piped())
34 .spawn()
35 {
36 Ok(child) => child,
37 Err(e) => {
38 eprintln!("Failed to spawn bash: {}", e);
42 return;
43 }
44 };
45
46 let stdout = child.stdout.take().expect("Failed to capture stdout");
48 let stderr = child.stderr.take().expect("Failed to capture stderr");
49
50 let mut stdout_reader = BufReader::new(stdout).lines();
52 let mut stderr_reader = BufReader::new(stderr).lines();
53
54 while let Ok(Some(line)) = stdout_reader.next_line().await {
58 yield BashOutput::Stdout { line };
59 }
60
61 while let Ok(Some(line)) = stderr_reader.next_line().await {
62 yield BashOutput::Stderr { line };
63 }
64
65 match child.wait().await {
67 Ok(status) => {
68 let code = status.code().unwrap_or(-1);
69 yield BashOutput::Exit { code };
70 }
71 Err(e) => {
72 eprintln!("Failed to wait for child: {}", e);
73 yield BashOutput::Exit { code: -1 };
74 }
75 }
76 })
77 }
78
79 pub async fn execute_collect(&self, command: &str) -> Vec<BashOutput> {
81 use futures::StreamExt;
82
83 let mut results = Vec::new();
84 let mut stream = self.execute(command).await;
85
86 while let Some(output) = stream.next().await {
87 results.push(output);
88 }
89
90 results
91 }
92}
93
94impl Default for BashExecutor {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[tokio::test]
105 async fn test_execute_simple_command() {
106 let executor = BashExecutor::new();
107 let outputs = executor.execute_collect("echo 'hello world'").await;
108
109 assert!(outputs.len() >= 2);
111
112 match &outputs[0] {
114 BashOutput::Stdout { line } => assert_eq!(line, "hello world"),
115 _ => panic!("Expected stdout"),
116 }
117
118 match outputs.last().unwrap() {
120 BashOutput::Exit { code } => assert_eq!(*code, 0),
121 _ => panic!("Expected exit"),
122 }
123 }
124
125 #[tokio::test]
126 async fn test_execute_stderr() {
127 let executor = BashExecutor::new();
128 let outputs = executor
129 .execute_collect("echo 'error' >&2")
130 .await;
131
132 assert!(outputs.len() >= 2);
134
135 let has_stderr = outputs.iter().any(|o| matches!(o, BashOutput::Stderr { .. }));
137 assert!(has_stderr);
138 }
139
140 #[tokio::test]
141 async fn test_execute_exit_code() {
142 let executor = BashExecutor::new();
143 let outputs = executor.execute_collect("exit 42").await;
144
145 match outputs.last().unwrap() {
147 BashOutput::Exit { code } => assert_eq!(*code, 42),
148 _ => panic!("Expected exit"),
149 }
150 }
151}