Skip to main content

earl_protocol_bash/
executor.rs

1use std::process::Stdio;
2use std::time::Duration;
3
4use anyhow::{Context, Result, bail};
5use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
6
7use earl_core::{ExecutionContext, RawExecutionResult, StreamChunk, StreamMeta};
8
9use crate::PreparedBashScript;
10use crate::sandbox::{build_sandboxed_command, sandbox_available, sandbox_tool_name};
11
12/// Execute a single bash script inside a sandbox and return the result.
13pub async fn execute_bash_once(
14    data: &PreparedBashScript,
15    ctx: &ExecutionContext,
16) -> Result<RawExecutionResult> {
17    if !sandbox_available() {
18        bail!(
19            "bash sandbox tool ({}) is not available on this system; \
20             install it or disable the bash feature",
21            sandbox_tool_name()
22        );
23    }
24
25    let mut command =
26        build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
27
28    command.stdout(Stdio::piped());
29    command.stderr(Stdio::piped());
30
31    if data.stdin.is_some() {
32        command.stdin(Stdio::piped());
33    } else {
34        command.stdin(Stdio::null());
35    }
36
37    let max_memory = data.sandbox.max_memory_bytes;
38    let max_cpu_secs = data.sandbox.max_cpu_time_ms.map(|ms| ms.div_ceil(1000));
39
40    // SAFETY: setsid() creates a new session / process group so that we can
41    // kill the entire group on timeout without leaking children.
42    // setrlimit() calls are safe in a post-fork, pre-exec context.
43    unsafe {
44        command.pre_exec(move || {
45            libc::setsid();
46            if let Some(bytes) = max_memory {
47                let rlim = libc::rlimit {
48                    rlim_cur: bytes as libc::rlim_t,
49                    rlim_max: bytes as libc::rlim_t,
50                };
51                if libc::setrlimit(libc::RLIMIT_AS, &rlim) != 0 {
52                    return Err(std::io::Error::last_os_error());
53                }
54            }
55            if let Some(secs) = max_cpu_secs {
56                // Note: RLIMIT_CPU grace period sends SIGXCPU then SIGKILL ~1s later.
57                let rlim = libc::rlimit {
58                    rlim_cur: secs as libc::rlim_t,
59                    rlim_max: secs as libc::rlim_t,
60                };
61                if libc::setrlimit(libc::RLIMIT_CPU, &rlim) != 0 {
62                    return Err(std::io::Error::last_os_error());
63                }
64            }
65            Ok(())
66        });
67    }
68
69    let mut child = command
70        .spawn()
71        .context("failed spawning sandboxed bash command")?;
72
73    let pid = child
74        .id()
75        .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
76
77    let stdout = child
78        .stdout
79        .take()
80        .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
81    let stderr = child
82        .stderr
83        .take()
84        .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
85
86    // Use sandbox output limit if set, otherwise fall back to transport limit.
87    let max_bytes = data
88        .sandbox
89        .max_output_bytes
90        .unwrap_or(ctx.transport.max_response_bytes);
91
92    let stdout_reader =
93        tokio::spawn(async move { read_stream_limited(stdout, max_bytes, "stdout").await });
94    let stderr_reader =
95        tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
96
97    // Write stdin if present, then drop the handle so the child sees EOF.
98    if let Some(input) = &data.stdin
99        && let Some(mut stdin_handle) = child.stdin.take()
100    {
101        stdin_handle
102            .write_all(input.as_bytes())
103            .await
104            .context("failed writing stdin to bash process")?;
105    }
106
107    // Use sandbox timeout if set, otherwise fall back to transport timeout.
108    let timeout = data
109        .sandbox
110        .max_time_ms
111        .map(Duration::from_millis)
112        .unwrap_or(ctx.transport.timeout);
113
114    let status = match tokio::time::timeout(timeout, child.wait()).await {
115        Ok(wait_result) => wait_result.context("failed waiting for bash process")?,
116        Err(_) => {
117            // Timeout: kill the entire process group.
118            if let Ok(pgid) = i32::try_from(pid) {
119                unsafe { libc::killpg(pgid, libc::SIGKILL) };
120            }
121            let _ = child.kill().await;
122            let _ = child.wait().await;
123            bail!("bash script timed out after {timeout:?}");
124        }
125    };
126
127    let stdout_bytes = stdout_reader
128        .await
129        .context("failed joining stdout reader task")??;
130    let stderr_bytes = stderr_reader
131        .await
132        .context("failed joining stderr reader task")??;
133
134    let exit_code = status
135        .code()
136        .map(|c| c.clamp(0, u16::MAX as i32) as u16)
137        .unwrap_or(1);
138
139    let output_bytes = if stdout_bytes.is_empty() {
140        &stderr_bytes
141    } else {
142        &stdout_bytes
143    };
144
145    Ok(RawExecutionResult {
146        status: exit_code,
147        url: "bash://script".into(),
148        body: output_bytes.to_vec(),
149        content_type: None,
150    })
151}
152
153use earl_core::{ProtocolExecutor, StreamingProtocolExecutor};
154use tokio::sync::mpsc;
155
156/// Bash protocol executor.
157pub struct BashExecutor;
158
159impl ProtocolExecutor for BashExecutor {
160    type PreparedData = PreparedBashScript;
161
162    async fn execute(
163        &mut self,
164        data: &PreparedBashScript,
165        ctx: &ExecutionContext,
166    ) -> anyhow::Result<RawExecutionResult> {
167        execute_bash_once(data, ctx).await
168    }
169}
170
171/// Streaming bash protocol executor.
172///
173/// Reuses the same sandboxed process setup as [`BashExecutor`] but streams
174/// stdout line-by-line through an `mpsc::Sender<StreamChunk>` instead of
175/// buffering the entire output.
176pub struct BashStreamExecutor;
177
178impl StreamingProtocolExecutor for BashStreamExecutor {
179    type PreparedData = PreparedBashScript;
180
181    async fn execute_stream(
182        &mut self,
183        data: &PreparedBashScript,
184        ctx: &ExecutionContext,
185        sender: mpsc::Sender<StreamChunk>,
186    ) -> anyhow::Result<StreamMeta> {
187        if !sandbox_available() {
188            bail!(
189                "bash sandbox tool ({}) is not available on this system; \
190                 install it or disable the bash feature",
191                sandbox_tool_name()
192            );
193        }
194
195        let mut command =
196            build_sandboxed_command(&data.script, &data.env, data.cwd.as_deref(), &data.sandbox)?;
197
198        command.stdout(Stdio::piped());
199        command.stderr(Stdio::piped());
200
201        if data.stdin.is_some() {
202            command.stdin(Stdio::piped());
203        } else {
204            command.stdin(Stdio::null());
205        }
206
207        let max_memory = data.sandbox.max_memory_bytes;
208        let max_cpu_secs = data.sandbox.max_cpu_time_ms.map(|ms| ms.div_ceil(1000));
209
210        // SAFETY: setsid() creates a new session / process group so that we
211        // can kill the entire group on timeout without leaking children.
212        // setrlimit() calls are safe in a post-fork, pre-exec context.
213        unsafe {
214            command.pre_exec(move || {
215                libc::setsid();
216                if let Some(bytes) = max_memory {
217                    let rlim = libc::rlimit {
218                        rlim_cur: bytes as libc::rlim_t,
219                        rlim_max: bytes as libc::rlim_t,
220                    };
221                    if libc::setrlimit(libc::RLIMIT_AS, &rlim) != 0 {
222                        return Err(std::io::Error::last_os_error());
223                    }
224                }
225                if let Some(secs) = max_cpu_secs {
226                    // Note: RLIMIT_CPU grace period sends SIGXCPU then SIGKILL ~1s later.
227                    let rlim = libc::rlimit {
228                        rlim_cur: secs as libc::rlim_t,
229                        rlim_max: secs as libc::rlim_t,
230                    };
231                    if libc::setrlimit(libc::RLIMIT_CPU, &rlim) != 0 {
232                        return Err(std::io::Error::last_os_error());
233                    }
234                }
235                Ok(())
236            });
237        }
238
239        let mut child = command
240            .spawn()
241            .context("failed spawning sandboxed bash command")?;
242
243        let pid = child
244            .id()
245            .ok_or_else(|| anyhow::anyhow!("failed to obtain PID of spawned bash process"))?;
246
247        let stdout = child
248            .stdout
249            .take()
250            .ok_or_else(|| anyhow::anyhow!("failed capturing bash stdout"))?;
251        let stderr = child
252            .stderr
253            .take()
254            .ok_or_else(|| anyhow::anyhow!("failed capturing bash stderr"))?;
255
256        // Use sandbox output limit if set, otherwise fall back to transport limit.
257        let max_bytes = data
258            .sandbox
259            .max_output_bytes
260            .unwrap_or(ctx.transport.max_response_bytes);
261
262        // Drain stderr in a background task so the child process does not block.
263        let stderr_task =
264            tokio::spawn(async move { read_stream_limited(stderr, max_bytes, "stderr").await });
265
266        // Write stdin if present, then drop the handle so the child sees EOF.
267        if let Some(input) = &data.stdin
268            && let Some(mut stdin_handle) = child.stdin.take()
269        {
270            stdin_handle
271                .write_all(input.as_bytes())
272                .await
273                .context("failed writing stdin to bash process")?;
274        }
275
276        // Use sandbox timeout if set, otherwise fall back to transport timeout.
277        let timeout = data
278            .sandbox
279            .max_time_ms
280            .map(Duration::from_millis)
281            .unwrap_or(ctx.transport.timeout);
282
283        // Helper closure to kill the process group.
284        let kill_process = |pid: u32| {
285            if let Ok(pgid) = i32::try_from(pid) {
286                unsafe { libc::killpg(pgid, libc::SIGKILL) };
287            }
288        };
289
290        // Wrap the entire streaming operation (stdout reading + wait) in a
291        // single timeout so that a long-running process cannot stream forever.
292        let stream_result = tokio::time::timeout(timeout, async {
293            let mut reader = BufReader::new(stdout);
294            let mut buf = Vec::new();
295            let mut total_bytes: usize = 0;
296
297            loop {
298                buf.clear();
299                let bytes_read = reader
300                    .read_until(b'\n', &mut buf)
301                    .await
302                    .context("failed reading bash stdout")?;
303                if bytes_read == 0 {
304                    break;
305                }
306
307                total_bytes = total_bytes.saturating_add(bytes_read);
308                if total_bytes > max_bytes {
309                    // Kill the process and reap it before bailing.
310                    kill_process(pid);
311                    let _ = child.wait().await;
312                    bail!("bash stdout exceeded configured max output bytes ({max_bytes} bytes)");
313                }
314
315                let line = String::from_utf8_lossy(&buf).into_owned();
316                let chunk = StreamChunk {
317                    data: line.into_bytes(),
318                    content_type: None,
319                };
320                if sender.send(chunk).await.is_err() {
321                    // Receiver dropped — kill the process and stop.
322                    kill_process(pid);
323                    break;
324                }
325            }
326
327            // Drop the sender so the consumer sees the end of the stream.
328            drop(sender);
329
330            // Wait for the child to finish.
331            let status = child
332                .wait()
333                .await
334                .context("failed waiting for bash process")?;
335            Ok::<_, anyhow::Error>(status)
336        })
337        .await;
338
339        // Abort the stderr task — we don't need it anymore.
340        stderr_task.abort();
341
342        match stream_result {
343            Ok(Ok(status)) => {
344                let exit_code = status
345                    .code()
346                    .map(|c| c.clamp(0, u16::MAX as i32) as u16)
347                    .unwrap_or(1);
348                Ok(StreamMeta {
349                    status: exit_code,
350                    url: "bash://script".into(),
351                })
352            }
353            Ok(Err(e)) => Err(e),
354            Err(_) => {
355                // Timeout: kill the entire process group.
356                kill_process(pid);
357                let _ = child.kill().await;
358                let _ = child.wait().await;
359                bail!("bash script timed out after {timeout:?}");
360            }
361        }
362    }
363}
364
365async fn read_stream_limited<R>(mut reader: R, limit: usize, label: &str) -> Result<Vec<u8>>
366where
367    R: AsyncRead + Unpin,
368{
369    let mut out = Vec::new();
370    let mut buf = [0_u8; 8192];
371
372    loop {
373        let bytes_read = reader
374            .read(&mut buf)
375            .await
376            .with_context(|| format!("failed reading bash {label}"))?;
377        if bytes_read == 0 {
378            break;
379        }
380        if out.len().saturating_add(bytes_read) > limit {
381            bail!("bash {label} exceeded configured max_response_bytes ({limit} bytes)");
382        }
383        out.extend_from_slice(&buf[..bytes_read]);
384    }
385
386    Ok(out)
387}