earl_protocol_bash/
executor.rs1use std::process::Stdio;
2use std::time::Duration;
3
4use anyhow::{Context, Result, bail};
5use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
6
7use earl_core::{ExecutionContext, RawExecutionResult, StreamChunk, StreamMeta};
8
9use crate::PreparedBashScript;
10use crate::sandbox::{build_sandboxed_command, sandbox_available, sandbox_tool_name};
11
12pub async fn execute_bash_once(
14 data: &PreparedBashScript,
15 ctx: &ExecutionContext,
16) -> Result<RawExecutionResult> {
17 if !sandbox_available() {
18 bail!(
19 "bash sandbox tool ({}) is not available on this system; \
20 install it or disable the bash feature",
21 sandbox_tool_name()
22 );
23 }
24
25 let mut command =
26 build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
27
28 command.stdout(Stdio::piped());
29 command.stderr(Stdio::piped());
30
31 if data.stdin.is_some() {
32 command.stdin(Stdio::piped());
33 } else {
34 command.stdin(Stdio::null());
35 }
36
37 unsafe {
40 command.pre_exec(|| {
41 libc::setsid();
42 Ok(())
43 });
44 }
45
46 let mut child = command
47 .spawn()
48 .context("failed spawning sandboxed bash command")?;
49
50 let pid = child
51 .id()
52 .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
53
54 let stdout = child
55 .stdout
56 .take()
57 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
58 let stderr = child
59 .stderr
60 .take()
61 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
62
63 let max_bytes = data
65 .sandbox
66 .max_output_bytes
67 .unwrap_or(ctx.transport.max_response_bytes);
68
69 let stdout_reader =
70 tokio::spawn(async move { read_stream_limited(stdout, max_bytes, "stdout").await });
71 let stderr_reader =
72 tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
73
74 if let Some(input) = &data.stdin
76 && let Some(mut stdin_handle) = child.stdin.take()
77 {
78 stdin_handle
79 .write_all(input.as_bytes())
80 .await
81 .context("failed writing stdin to bash process")?;
82 }
83
84 let timeout = data
86 .sandbox
87 .max_time_ms
88 .map(Duration::from_millis)
89 .unwrap_or(ctx.transport.timeout);
90
91 let status = match tokio::time::timeout(timeout, child.wait()).await {
92 Ok(wait_result) => wait_result.context("failed waiting for bash process")?,
93 Err(_) => {
94 if let Ok(pgid) = i32::try_from(pid) {
96 unsafe { libc::killpg(pgid, libc::SIGKILL) };
97 }
98 let _ = child.kill().await;
99 let _ = child.wait().await;
100 bail!("bash script timed out after {timeout:?}");
101 }
102 };
103
104 let stdout_bytes = stdout_reader
105 .await
106 .context("failed joining stdout reader task")??;
107 let stderr_bytes = stderr_reader
108 .await
109 .context("failed joining stderr reader task")??;
110
111 let exit_code = status
112 .code()
113 .map(|c| c.clamp(0, u16::MAX as i32) as u16)
114 .unwrap_or(1);
115
116 let output_bytes = if stdout_bytes.is_empty() {
117 &stderr_bytes
118 } else {
119 &stdout_bytes
120 };
121
122 Ok(RawExecutionResult {
123 status: exit_code,
124 url: "bash://script".into(),
125 body: output_bytes.to_vec(),
126 content_type: None,
127 })
128}
129
130use earl_core::{ProtocolExecutor, StreamingProtocolExecutor};
131use tokio::sync::mpsc;
132
133pub struct BashExecutor;
135
136impl ProtocolExecutor for BashExecutor {
137 type PreparedData = PreparedBashScript;
138
139 async fn execute(
140 &mut self,
141 data: &PreparedBashScript,
142 ctx: &ExecutionContext,
143 ) -> anyhow::Result<RawExecutionResult> {
144 execute_bash_once(data, ctx).await
145 }
146}
147
148pub struct BashStreamExecutor;
154
155impl StreamingProtocolExecutor for BashStreamExecutor {
156 type PreparedData = PreparedBashScript;
157
158 async fn execute_stream(
159 &mut self,
160 data: &PreparedBashScript,
161 ctx: &ExecutionContext,
162 sender: mpsc::Sender<StreamChunk>,
163 ) -> anyhow::Result<StreamMeta> {
164 if !sandbox_available() {
165 bail!(
166 "bash sandbox tool ({}) is not available on this system; \
167 install it or disable the bash feature",
168 sandbox_tool_name()
169 );
170 }
171
172 let mut command =
173 build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
174
175 command.stdout(Stdio::piped());
176 command.stderr(Stdio::piped());
177
178 if data.stdin.is_some() {
179 command.stdin(Stdio::piped());
180 } else {
181 command.stdin(Stdio::null());
182 }
183
184 unsafe {
187 command.pre_exec(|| {
188 libc::setsid();
189 Ok(())
190 });
191 }
192
193 let mut child = command
194 .spawn()
195 .context("failed spawning sandboxed bash command")?;
196
197 let pid = child
198 .id()
199 .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
200
201 let stdout = child
202 .stdout
203 .take()
204 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
205 let stderr = child
206 .stderr
207 .take()
208 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
209
210 let max_bytes = data
212 .sandbox
213 .max_output_bytes
214 .unwrap_or(ctx.transport.max_response_bytes);
215
216 let stderr_task =
218 tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
219
220 if let Some(input) = &data.stdin
222 && let Some(mut stdin_handle) = child.stdin.take()
223 {
224 stdin_handle
225 .write_all(input.as_bytes())
226 .await
227 .context("failed writing stdin to bash process")?;
228 }
229
230 let timeout = data
232 .sandbox
233 .max_time_ms
234 .map(Duration::from_millis)
235 .unwrap_or(ctx.transport.timeout);
236
237 let kill_process = |pid: u32| {
239 if let Ok(pgid) = i32::try_from(pid) {
240 unsafe { libc::killpg(pgid, libc::SIGKILL) };
241 }
242 };
243
244 let stream_result = tokio::time::timeout(timeout, async {
247 let mut reader = BufReader::new(stdout);
248 let mut buf = Vec::new();
249 let mut total_bytes: usize = 0;
250
251 loop {
252 buf.clear();
253 let bytes_read = reader
254 .read_until(b'\n', &mut buf)
255 .await
256 .context("failed reading bash stdout")?;
257 if bytes_read == 0 {
258 break;
259 }
260
261 total_bytes = total_bytes.saturating_add(bytes_read);
262 if total_bytes > max_bytes {
263 kill_process(pid);
265 let _ = child.wait().await;
266 bail!("bash stdout exceeded configured max output bytes ({max_bytes} bytes)");
267 }
268
269 let line = String::from_utf8_lossy(&buf).into_owned();
270 let chunk = StreamChunk {
271 data: line.into_bytes(),
272 content_type: None,
273 };
274 if sender.send(chunk).await.is_err() {
275 kill_process(pid);
277 break;
278 }
279 }
280
281 drop(sender);
283
284 let status = child
286 .wait()
287 .await
288 .context("failed waiting for bash process")?;
289 Ok::<_, anyhow::Error>(status)
290 })
291 .await;
292
293 stderr_task.abort();
295
296 match stream_result {
297 Ok(Ok(status)) => {
298 let exit_code = status
299 .code()
300 .map(|c| c.clamp(0, u16::MAX as i32) as u16)
301 .unwrap_or(1);
302 Ok(StreamMeta {
303 status: exit_code,
304 url: "bash://script".into(),
305 })
306 }
307 Ok(Err(e)) => Err(e),
308 Err(_) => {
309 kill_process(pid);
311 let _ = child.kill().await;
312 let _ = child.wait().await;
313 bail!("bash script timed out after {timeout:?}");
314 }
315 }
316 }
317}
318
319async fn read_stream_limited<R>(mut reader: R, limit: usize, label: &str) -> Result<Vec<u8>>
320where
321 R: AsyncRead + Unpin,
322{
323 let mut out = Vec::new();
324 let mut buf = [0_u8; 8192];
325
326 loop {
327 let bytes_read = reader
328 .read(&mut buf)
329 .await
330 .with_context(|| format!("failed reading bash {label}"))?;
331 if bytes_read == 0 {
332 break;
333 }
334 if out.len().saturating_add(bytes_read) > limit {
335 bail!("bash {label} exceeded configured max_response_bytes ({limit} bytes)");
336 }
337 out.extend_from_slice(&buf[..bytes_read]);
338 }
339
340 Ok(out)
341}