Skip to main content

earl_protocol_bash/
executor.rs

1use std::process::Stdio;
2use std::time::Duration;
3
4use anyhow::{Context, Result, bail};
5use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
6
7use earl_core::{ExecutionContext, RawExecutionResult, StreamChunk, StreamMeta};
8
9use crate::PreparedBashScript;
10use crate::sandbox::{build_sandboxed_command, sandbox_available, sandbox_tool_name};
11
12/// Execute a single bash script inside a sandbox and return the result.
13pub async fn execute_bash_once(
14    data: &PreparedBashScript,
15    ctx: &ExecutionContext,
16) -> Result<RawExecutionResult> {
17    if !sandbox_available() {
18        bail!(
19            "bash sandbox tool ({}) is not available on this system; \
20             install it or disable the bash feature",
21            sandbox_tool_name()
22        );
23    }
24
25    let mut command =
26        build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
27
28    command.stdout(Stdio::piped());
29    command.stderr(Stdio::piped());
30
31    if data.stdin.is_some() {
32        command.stdin(Stdio::piped());
33    } else {
34        command.stdin(Stdio::null());
35    }
36
37    // SAFETY: setsid() creates a new session / process group so that we can
38    // kill the entire group on timeout without leaking children.
39    unsafe {
40        command.pre_exec(|| {
41            libc::setsid();
42            Ok(())
43        });
44    }
45
46    let mut child = command
47        .spawn()
48        .context("failed spawning sandboxed bash command")?;
49
50    let pid = child
51        .id()
52        .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
53
54    let stdout = child
55        .stdout
56        .take()
57        .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
58    let stderr = child
59        .stderr
60        .take()
61        .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
62
63    // Use sandbox output limit if set, otherwise fall back to transport limit.
64    let max_bytes = data
65        .sandbox
66        .max_output_bytes
67        .unwrap_or(ctx.transport.max_response_bytes);
68
69    let stdout_reader =
70        tokio::spawn(async move { read_stream_limited(stdout, max_bytes, "stdout").await });
71    let stderr_reader =
72        tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
73
74    // Write stdin if present, then drop the handle so the child sees EOF.
75    if let Some(input) = &data.stdin
76        && let Some(mut stdin_handle) = child.stdin.take()
77    {
78        stdin_handle
79            .write_all(input.as_bytes())
80            .await
81            .context("failed writing stdin to bash process")?;
82    }
83
84    // Use sandbox timeout if set, otherwise fall back to transport timeout.
85    let timeout = data
86        .sandbox
87        .max_time_ms
88        .map(Duration::from_millis)
89        .unwrap_or(ctx.transport.timeout);
90
91    let status = match tokio::time::timeout(timeout, child.wait()).await {
92        Ok(wait_result) => wait_result.context("failed waiting for bash process")?,
93        Err(_) => {
94            // Timeout: kill the entire process group.
95            if let Ok(pgid) = i32::try_from(pid) {
96                unsafe { libc::killpg(pgid, libc::SIGKILL) };
97            }
98            let _ = child.kill().await;
99            let _ = child.wait().await;
100            bail!("bash script timed out after {timeout:?}");
101        }
102    };
103
104    let stdout_bytes = stdout_reader
105        .await
106        .context("failed joining stdout reader task")??;
107    let stderr_bytes = stderr_reader
108        .await
109        .context("failed joining stderr reader task")??;
110
111    let exit_code = status
112        .code()
113        .map(|c| c.clamp(0, u16::MAX as i32) as u16)
114        .unwrap_or(1);
115
116    let output_bytes = if stdout_bytes.is_empty() {
117        &stderr_bytes
118    } else {
119        &stdout_bytes
120    };
121
122    Ok(RawExecutionResult {
123        status: exit_code,
124        url: "bash://script".into(),
125        body: output_bytes.to_vec(),
126        content_type: None,
127    })
128}
129
130use earl_core::{ProtocolExecutor, StreamingProtocolExecutor};
131use tokio::sync::mpsc;
132
133/// Bash protocol executor.
134pub struct BashExecutor;
135
136impl ProtocolExecutor for BashExecutor {
137    type PreparedData = PreparedBashScript;
138
139    async fn execute(
140        &mut self,
141        data: &PreparedBashScript,
142        ctx: &ExecutionContext,
143    ) -> anyhow::Result<RawExecutionResult> {
144        execute_bash_once(data, ctx).await
145    }
146}
147
148/// Streaming bash protocol executor.
149///
150/// Reuses the same sandboxed process setup as [`BashExecutor`] but streams
151/// stdout line-by-line through an `mpsc::Sender<StreamChunk>` instead of
152/// buffering the entire output.
153pub struct BashStreamExecutor;
154
155impl StreamingProtocolExecutor for BashStreamExecutor {
156    type PreparedData = PreparedBashScript;
157
158    async fn execute_stream(
159        &mut self,
160        data: &PreparedBashScript,
161        ctx: &ExecutionContext,
162        sender: mpsc::Sender<StreamChunk>,
163    ) -> anyhow::Result<StreamMeta> {
164        if !sandbox_available() {
165            bail!(
166                "bash sandbox tool ({}) is not available on this system; \
167                 install it or disable the bash feature",
168                sandbox_tool_name()
169            );
170        }
171
172        let mut command =
173            build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
174
175        command.stdout(Stdio::piped());
176        command.stderr(Stdio::piped());
177
178        if data.stdin.is_some() {
179            command.stdin(Stdio::piped());
180        } else {
181            command.stdin(Stdio::null());
182        }
183
184        // SAFETY: setsid() creates a new session / process group so that we
185        // can kill the entire group on timeout without leaking children.
186        unsafe {
187            command.pre_exec(|| {
188                libc::setsid();
189                Ok(())
190            });
191        }
192
193        let mut child = command
194            .spawn()
195            .context("failed spawning sandboxed bash command")?;
196
197        let pid = child
198            .id()
199            .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
200
201        let stdout = child
202            .stdout
203            .take()
204            .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
205        let stderr = child
206            .stderr
207            .take()
208            .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
209
210        // Use sandbox output limit if set, otherwise fall back to transport limit.
211        let max_bytes = data
212            .sandbox
213            .max_output_bytes
214            .unwrap_or(ctx.transport.max_response_bytes);
215
216        // Drain stderr in a background task so the child process does not block.
217        let stderr_task =
218            tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
219
220        // Write stdin if present, then drop the handle so the child sees EOF.
221        if let Some(input) = &data.stdin
222            && let Some(mut stdin_handle) = child.stdin.take()
223        {
224            stdin_handle
225                .write_all(input.as_bytes())
226                .await
227                .context("failed writing stdin to bash process")?;
228        }
229
230        // Use sandbox timeout if set, otherwise fall back to transport timeout.
231        let timeout = data
232            .sandbox
233            .max_time_ms
234            .map(Duration::from_millis)
235            .unwrap_or(ctx.transport.timeout);
236
237        // Helper closure to kill the process group.
238        let kill_process = |pid: u32| {
239            if let Ok(pgid) = i32::try_from(pid) {
240                unsafe { libc::killpg(pgid, libc::SIGKILL) };
241            }
242        };
243
244        // Wrap the entire streaming operation (stdout reading + wait) in a
245        // single timeout so that a long-running process cannot stream forever.
246        let stream_result = tokio::time::timeout(timeout, async {
247            let mut reader = BufReader::new(stdout);
248            let mut buf = Vec::new();
249            let mut total_bytes: usize = 0;
250
251            loop {
252                buf.clear();
253                let bytes_read = reader
254                    .read_until(b'\n', &mut buf)
255                    .await
256                    .context("failed reading bash stdout")?;
257                if bytes_read == 0 {
258                    break;
259                }
260
261                total_bytes = total_bytes.saturating_add(bytes_read);
262                if total_bytes > max_bytes {
263                    // Kill the process and reap it before bailing.
264                    kill_process(pid);
265                    let _ = child.wait().await;
266                    bail!("bash stdout exceeded configured max output bytes ({max_bytes} bytes)");
267                }
268
269                let line = String::from_utf8_lossy(&buf).into_owned();
270                let chunk = StreamChunk {
271                    data: line.into_bytes(),
272                    content_type: None,
273                };
274                if sender.send(chunk).await.is_err() {
275                    // Receiver dropped — kill the process and stop.
276                    kill_process(pid);
277                    break;
278                }
279            }
280
281            // Drop the sender so the consumer sees the end of the stream.
282            drop(sender);
283
284            // Wait for the child to finish.
285            let status = child
286                .wait()
287                .await
288                .context("failed waiting for bash process")?;
289            Ok::<_, anyhow::Error>(status)
290        })
291        .await;
292
293        // Abort the stderr task — we don't need it anymore.
294        stderr_task.abort();
295
296        match stream_result {
297            Ok(Ok(status)) => {
298                let exit_code = status
299                    .code()
300                    .map(|c| c.clamp(0, u16::MAX as i32) as u16)
301                    .unwrap_or(1);
302                Ok(StreamMeta {
303                    status: exit_code,
304                    url: "bash://script".into(),
305                })
306            }
307            Ok(Err(e)) => Err(e),
308            Err(_) => {
309                // Timeout: kill the entire process group.
310                kill_process(pid);
311                let _ = child.kill().await;
312                let _ = child.wait().await;
313                bail!("bash script timed out after {timeout:?}");
314            }
315        }
316    }
317}
318
319async fn read_stream_limited<R>(mut reader: R, limit: usize, label: &str) -> Result<Vec<u8>>
320where
321    R: AsyncRead + Unpin,
322{
323    let mut out = Vec::new();
324    let mut buf = [0_u8; 8192];
325
326    loop {
327        let bytes_read = reader
328            .read(&mut buf)
329            .await
330            .with_context(|| format!("failed reading bash {label}"))?;
331        if bytes_read == 0 {
332            break;
333        }
334        if out.len().saturating_add(bytes_read) > limit {
335            bail!("bash {label} exceeded configured max_response_bytes ({limit} bytes)");
336        }
337        out.extend_from_slice(&buf[..bytes_read]);
338    }
339
340    Ok(out)
341}