earl_protocol_bash/
executor.rs1use std::process::Stdio;
2use std::time::Duration;
3
4use anyhow::{Context, Result, bail};
5use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
6
7use earl_core::{ExecutionContext, RawExecutionResult, StreamChunk, StreamMeta};
8
9use crate::PreparedBashScript;
10use crate::sandbox::{build_sandboxed_command, sandbox_available, sandbox_tool_name};
11
12pub async fn execute_bash_once(
14 data: &PreparedBashScript,
15 ctx: &ExecutionContext,
16) -> Result<RawExecutionResult> {
17 if !sandbox_available() {
18 bail!(
19 "bash sandbox tool ({}) is not available on this system; \
20 install it or disable the bash feature",
21 sandbox_tool_name()
22 );
23 }
24
25 let mut command =
26 build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
27
28 command.stdout(Stdio::piped());
29 command.stderr(Stdio::piped());
30
31 if data.stdin.is_some() {
32 command.stdin(Stdio::piped());
33 } else {
34 command.stdin(Stdio::null());
35 }
36
37 let max_memory = data.sandbox.max_memory_bytes;
38 let max_cpu_secs = data.sandbox.max_cpu_time_ms.map(|ms| ms.div_ceil(1000));
39
40 unsafe {
44 command.pre_exec(move || {
45 libc::setsid();
46 if let Some(bytes) = max_memory {
47 let rlim = libc::rlimit {
48 rlim_cur: bytes as libc::rlim_t,
49 rlim_max: bytes as libc::rlim_t,
50 };
51 if libc::setrlimit(libc::RLIMIT_AS, &rlim) != 0 {
52 return Err(std::io::Error::last_os_error());
53 }
54 }
55 if let Some(secs) = max_cpu_secs {
56 let rlim = libc::rlimit {
58 rlim_cur: secs as libc::rlim_t,
59 rlim_max: secs as libc::rlim_t,
60 };
61 if libc::setrlimit(libc::RLIMIT_CPU, &rlim) != 0 {
62 return Err(std::io::Error::last_os_error());
63 }
64 }
65 Ok(())
66 });
67 }
68
69 let mut child = command
70 .spawn()
71 .context("failed spawning sandboxed bash command")?;
72
73 let pid = child
74 .id()
75 .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
76
77 let stdout = child
78 .stdout
79 .take()
80 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
81 let stderr = child
82 .stderr
83 .take()
84 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
85
86 let max_bytes = data
88 .sandbox
89 .max_output_bytes
90 .unwrap_or(ctx.transport.max_response_bytes);
91
92 let stdout_reader =
93 tokio::spawn(async move { read_stream_limited(stdout, max_bytes, "stdout").await });
94 let stderr_reader =
95 tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
96
97 if let Some(input) = &data.stdin
99 && let Some(mut stdin_handle) = child.stdin.take()
100 {
101 stdin_handle
102 .write_all(input.as_bytes())
103 .await
104 .context("failed writing stdin to bash process")?;
105 }
106
107 let timeout = data
109 .sandbox
110 .max_time_ms
111 .map(Duration::from_millis)
112 .unwrap_or(ctx.transport.timeout);
113
114 let status = match tokio::time::timeout(timeout, child.wait()).await {
115 Ok(wait_result) => wait_result.context("failed waiting for bash process")?,
116 Err(_) => {
117 if let Ok(pgid) = i32::try_from(pid) {
119 unsafe { libc::killpg(pgid, libc::SIGKILL) };
120 }
121 let _ = child.kill().await;
122 let _ = child.wait().await;
123 bail!("bash script timed out after {timeout:?}");
124 }
125 };
126
127 let stdout_bytes = stdout_reader
128 .await
129 .context("failed joining stdout reader task")??;
130 let stderr_bytes = stderr_reader
131 .await
132 .context("failed joining stderr reader task")??;
133
134 let exit_code = status
135 .code()
136 .map(|c| c.clamp(0, u16::MAX as i32) as u16)
137 .unwrap_or(1);
138
139 let output_bytes = if stdout_bytes.is_empty() {
140 &stderr_bytes
141 } else {
142 &stdout_bytes
143 };
144
145 Ok(RawExecutionResult {
146 status: exit_code,
147 url: "bash://script".into(),
148 body: output_bytes.to_vec(),
149 content_type: None,
150 })
151}
152
153use earl_core::{ProtocolExecutor, StreamingProtocolExecutor};
154use tokio::sync::mpsc;
155
156pub struct BashExecutor;
158
159impl ProtocolExecutor for BashExecutor {
160 type PreparedData = PreparedBashScript;
161
162 async fn execute(
163 &mut self,
164 data: &PreparedBashScript,
165 ctx: &ExecutionContext,
166 ) -> anyhow::Result<RawExecutionResult> {
167 execute_bash_once(data, ctx).await
168 }
169}
170
171pub struct BashStreamExecutor;
177
178impl StreamingProtocolExecutor for BashStreamExecutor {
179 type PreparedData = PreparedBashScript;
180
181 async fn execute_stream(
182 &mut self,
183 data: &PreparedBashScript,
184 ctx: &ExecutionContext,
185 sender: mpsc::Sender<StreamChunk>,
186 ) -> anyhow::Result<StreamMeta> {
187 if !sandbox_available() {
188 bail!(
189 "bash sandbox tool ({}) is not available on this system; \
190 install it or disable the bash feature",
191 sandbox_tool_name()
192 );
193 }
194
195 let mut command =
196 build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
197
198 command.stdout(Stdio::piped());
199 command.stderr(Stdio::piped());
200
201 if data.stdin.is_some() {
202 command.stdin(Stdio::piped());
203 } else {
204 command.stdin(Stdio::null());
205 }
206
207 let max_memory = data.sandbox.max_memory_bytes;
208 let max_cpu_secs = data.sandbox.max_cpu_time_ms.map(|ms| ms.div_ceil(1000));
209
210 unsafe {
214 command.pre_exec(move || {
215 libc::setsid();
216 if let Some(bytes) = max_memory {
217 let rlim = libc::rlimit {
218 rlim_cur: bytes as libc::rlim_t,
219 rlim_max: bytes as libc::rlim_t,
220 };
221 if libc::setrlimit(libc::RLIMIT_AS, &rlim) != 0 {
222 return Err(std::io::Error::last_os_error());
223 }
224 }
225 if let Some(secs) = max_cpu_secs {
226 let rlim = libc::rlimit {
228 rlim_cur: secs as libc::rlim_t,
229 rlim_max: secs as libc::rlim_t,
230 };
231 if libc::setrlimit(libc::RLIMIT_CPU, &rlim) != 0 {
232 return Err(std::io::Error::last_os_error());
233 }
234 }
235 Ok(())
236 });
237 }
238
239 let mut child = command
240 .spawn()
241 .context("failed spawning sandboxed bash command")?;
242
243 let pid = child
244 .id()
245 .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
246
247 let stdout = child
248 .stdout
249 .take()
250 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
251 let stderr = child
252 .stderr
253 .take()
254 .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
255
256 let max_bytes = data
258 .sandbox
259 .max_output_bytes
260 .unwrap_or(ctx.transport.max_response_bytes);
261
262 let stderr_task =
264 tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
265
266 if let Some(input) = &data.stdin
268 && let Some(mut stdin_handle) = child.stdin.take()
269 {
270 stdin_handle
271 .write_all(input.as_bytes())
272 .await
273 .context("failed writing stdin to bash process")?;
274 }
275
276 let timeout = data
278 .sandbox
279 .max_time_ms
280 .map(Duration::from_millis)
281 .unwrap_or(ctx.transport.timeout);
282
283 let kill_process = |pid: u32| {
285 if let Ok(pgid) = i32::try_from(pid) {
286 unsafe { libc::killpg(pgid, libc::SIGKILL) };
287 }
288 };
289
290 let stream_result = tokio::time::timeout(timeout, async {
293 let mut reader = BufReader::new(stdout);
294 let mut buf = Vec::new();
295 let mut total_bytes: usize = 0;
296
297 loop {
298 buf.clear();
299 let bytes_read = reader
300 .read_until(b'\n', &mut buf)
301 .await
302 .context("failed reading bash stdout")?;
303 if bytes_read == 0 {
304 break;
305 }
306
307 total_bytes = total_bytes.saturating_add(bytes_read);
308 if total_bytes > max_bytes {
309 kill_process(pid);
311 let _ = child.wait().await;
312 bail!("bash stdout exceeded configured max output bytes ({max_bytes} bytes)");
313 }
314
315 let line = String::from_utf8_lossy(&buf).into_owned();
316 let chunk = StreamChunk {
317 data: line.into_bytes(),
318 content_type: None,
319 };
320 if sender.send(chunk).await.is_err() {
321 kill_process(pid);
323 break;
324 }
325 }
326
327 drop(sender);
329
330 let status = child
332 .wait()
333 .await
334 .context("failed waiting for bash process")?;
335 Ok::<_, anyhow::Error>(status)
336 })
337 .await;
338
339 stderr_task.abort();
341
342 match stream_result {
343 Ok(Ok(status)) => {
344 let exit_code = status
345 .code()
346 .map(|c| c.clamp(0, u16::MAX as i32) as u16)
347 .unwrap_or(1);
348 Ok(StreamMeta {
349 status: exit_code,
350 url: "bash://script".into(),
351 })
352 }
353 Ok(Err(e)) => Err(e),
354 Err(_) => {
355 kill_process(pid);
357 let _ = child.kill().await;
358 let _ = child.wait().await;
359 bail!("bash script timed out after {timeout:?}");
360 }
361 }
362 }
363}
364
365async fn read_stream_limited<R>(mut reader: R, limit: usize, label: &str) -> Result<Vec<u8>>
366where
367 R: AsyncRead + Unpin,
368{
369 let mut out = Vec::new();
370 let mut buf = [0_u8; 8192];
371
372 loop {
373 let bytes_read = reader
374 .read(&mut buf)
375 .await
376 .with_context(|| format!("failed reading bash {label}"))?;
377 if bytes_read == 0 {
378 break;
379 }
380 if out.len().saturating_add(bytes_read) > limit {
381 bail!("bash {label} exceeded configured max_response_bytes ({limit} bytes)");
382 }
383 out.extend_from_slice(&buf[..bytes_read]);
384 }
385
386 Ok(out)
387}