Skip to main content

smol_workflow_engine/environment/
sandbox.rs

1//! Sandbox-backed execution environment implementation.
2
3use super::{
4    AgentExecutionEnvironment, EnvironmentPath, ExecEvent, ExecEventSink, ExecOutput, ExecRequest,
5    SpawnOutput,
6};
7use anyhow::Context;
8use base64::prelude::*;
9use smol_workflow_sandbox::{
10    CreateTempDirRequest, OpenSandboxRequest, SandboxExecEvent, SandboxExecRequest,
11    SandboxProviderJsonlClient, SandboxSession, SandboxSpawnRequest, SessionPathRequest,
12    WriteFileRequest,
13};
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17/// Execution environment backed by a long-lived sandbox provider JSONL process.
18#[derive(Debug, Clone)]
19pub struct SandboxExecutionEnvironment {
20    client: Arc<SandboxProviderJsonlClient>,
21    session: SandboxSession,
22    cwd: Option<EnvironmentPath>,
23}
24
25impl SandboxExecutionEnvironment {
26    /// Start `program serve`, open a sandbox session, and wrap it as an execution environment.
27    pub async fn open(
28        program: impl Into<std::path::PathBuf>,
29        request: OpenSandboxRequest,
30    ) -> anyhow::Result<Self> {
31        let client = Arc::new(SandboxProviderJsonlClient::start(program).await?);
32        let session = client.open(request).await?;
33        Ok(Self::from_session(client, session))
34    }
35
36    pub fn from_session(client: Arc<SandboxProviderJsonlClient>, session: SandboxSession) -> Self {
37        let cwd = session.cwd.clone().map(EnvironmentPath);
38        Self {
39            client,
40            session,
41            cwd,
42        }
43    }
44
45    pub fn client(&self) -> &Arc<SandboxProviderJsonlClient> {
46        &self.client
47    }
48
49    pub fn session(&self) -> &SandboxSession {
50        &self.session
51    }
52
53    /// Close the sandbox session and ask the provider to shut down.
54    pub async fn close(self) -> anyhow::Result<()> {
55        let close_result = self.client.close(self.session).await;
56        let shutdown_result = self.client.shutdown().await;
57        close_result?;
58        shutdown_result?;
59        Ok(())
60    }
61
62    fn session_path_request(&self, path: &EnvironmentPath) -> SessionPathRequest {
63        SessionPathRequest {
64            session: self.session.clone(),
65            path: path.0.clone(),
66        }
67    }
68}
69
70#[async_trait::async_trait]
71impl AgentExecutionEnvironment for SandboxExecutionEnvironment {
72    fn cwd(&self) -> Option<&EnvironmentPath> {
73        self.cwd.as_ref()
74    }
75
76    async fn create_dir_all(&self, path: &EnvironmentPath) -> anyhow::Result<()> {
77        self.client
78            .create_dir_all(self.session_path_request(path))
79            .await
80            .with_context(|| format!("failed to create sandbox directory `{}`", path.as_str()))
81    }
82
83    async fn write_file(&self, path: &EnvironmentPath, content: &[u8]) -> anyhow::Result<()> {
84        self.client
85            .write_file(WriteFileRequest {
86                session: self.session.clone(),
87                path: path.0.clone(),
88                content_base64: BASE64_STANDARD.encode(content),
89            })
90            .await
91            .with_context(|| format!("failed to write sandbox file `{}`", path.as_str()))
92    }
93
94    async fn read_file(&self, path: &EnvironmentPath) -> anyhow::Result<Vec<u8>> {
95        let result = self
96            .client
97            .read_file(self.session_path_request(path))
98            .await
99            .with_context(|| format!("failed to read sandbox file `{}`", path.as_str()))?;
100        BASE64_STANDARD
101            .decode(result.content_base64)
102            .context("sandbox read_file returned invalid base64 content")
103    }
104
105    async fn remove(&self, path: &EnvironmentPath) -> anyhow::Result<()> {
106        self.client
107            .remove(self.session_path_request(path))
108            .await
109            .with_context(|| format!("failed to remove sandbox path `{}`", path.as_str()))
110    }
111
112    async fn create_temp_dir(&self, prefix: &str) -> anyhow::Result<EnvironmentPath> {
113        let result = self
114            .client
115            .create_temp_dir(CreateTempDirRequest {
116                session: self.session.clone(),
117                prefix: prefix.to_string(),
118            })
119            .await
120            .context("failed to create sandbox temp directory")?;
121        Ok(EnvironmentPath(result.path))
122    }
123
124    async fn exec(
125        &self,
126        request: ExecRequest,
127        sink: &mut dyn ExecEventSink,
128    ) -> anyhow::Result<ExecOutput> {
129        let sandbox_request = SandboxExecRequest {
130            session: self.session.clone(),
131            argv: request.argv,
132            cwd: request.cwd.map(|path| path.0),
133            env: request.env,
134            stdin_base64: request.stdin.map(|stdin| BASE64_STANDARD.encode(stdin)),
135        };
136
137        let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ExecEvent>();
138        let result = {
139            let exec_future = self.client.exec(sandbox_request, move |event| {
140                event_tx.send(convert_exec_event(event)?).map_err(|_| {
141                    smol_workflow_sandbox::JsonlClientError::Protocol(
142                        "sandbox exec event receiver closed".to_string(),
143                    )
144                })?;
145                Ok(())
146            });
147            tokio::pin!(exec_future);
148
149            loop {
150                tokio::select! {
151                    result = &mut exec_future => break result,
152                    event = event_rx.recv() => {
153                        if let Some(event) = event {
154                            sink.event(event).await?;
155                        }
156                    }
157                }
158            }
159        }?;
160
161        while let Some(event) = event_rx.recv().await {
162            sink.event(event).await?;
163        }
164
165        Ok(ExecOutput {
166            exit_code: result.exit_code,
167            stdout: BASE64_STANDARD
168                .decode(result.stdout_base64)
169                .context("sandbox exec returned invalid base64 stdout")?,
170            stderr: BASE64_STANDARD
171                .decode(result.stderr_base64)
172                .context("sandbox exec returned invalid base64 stderr")?,
173        })
174    }
175
176    async fn spawn(
177        &self,
178        request: ExecRequest,
179        sink: Option<Box<dyn ExecEventSink>>,
180    ) -> anyhow::Result<SpawnOutput> {
181        let sandbox_request = SandboxSpawnRequest {
182            session: self.session.clone(),
183            argv: request.argv,
184            cwd: request.cwd.map(|path| path.0),
185            env: request.env,
186            stdin_base64: request.stdin.map(|stdin| BASE64_STANDARD.encode(stdin)),
187        };
188        let Some(mut sink) = sink else {
189            let result = self.client.spawn(sandbox_request).await?;
190            return Ok(SpawnOutput {
191                process_id: result.process_id,
192            });
193        };
194
195        let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ExecEvent>();
196        let result: smol_workflow_sandbox::SandboxSpawnResult = {
197            let spawn_future =
198                self.client
199                    .request_with_events("spawn", sandbox_request, move |event| {
200                        event_tx.send(convert_exec_event(event)?).map_err(|_| {
201                            smol_workflow_sandbox::JsonlClientError::Protocol(
202                                "sandbox spawn event receiver closed".to_string(),
203                            )
204                        })?;
205                        Ok(())
206                    });
207            tokio::pin!(spawn_future);
208
209            loop {
210                tokio::select! {
211                    result = &mut spawn_future => break result,
212                    event = event_rx.recv() => {
213                        if let Some(event) = event {
214                            sink.event(event).await?;
215                        }
216                    }
217                }
218            }
219        }?;
220
221        while let Some(event) = event_rx.recv().await {
222            sink.event(event).await?;
223        }
224        Ok(SpawnOutput {
225            process_id: result.process_id,
226        })
227    }
228}
229
230fn convert_exec_event(
231    event: SandboxExecEvent,
232) -> Result<ExecEvent, smol_workflow_sandbox::JsonlClientError> {
233    match event.r#type.as_str() {
234        "started" => Ok(ExecEvent::Started {
235            process_id: event.process_id,
236        }),
237        "stdout" => Ok(ExecEvent::Stdout {
238            chunk: decode_event_data(event.data_base64)?,
239        }),
240        "stderr" => Ok(ExecEvent::Stderr {
241            chunk: decode_event_data(event.data_base64)?,
242        }),
243        "exited" => Ok(ExecEvent::Exited {
244            exit_code: event.exit_code.unwrap_or(-1),
245        }),
246        other => Err(smol_workflow_sandbox::JsonlClientError::Protocol(format!(
247            "unknown sandbox exec event type `{other}`"
248        ))),
249    }
250}
251
252fn decode_event_data(
253    data_base64: Option<String>,
254) -> Result<Vec<u8>, smol_workflow_sandbox::JsonlClientError> {
255    let Some(data_base64) = data_base64 else {
256        return Ok(Vec::new());
257    };
258    BASE64_STANDARD.decode(data_base64).map_err(|error| {
259        smol_workflow_sandbox::JsonlClientError::Protocol(format!(
260            "sandbox exec event contained invalid base64 data: {error}"
261        ))
262    })
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use smol_workflow_sandbox::{Metadata, ProfileRef, WorkspaceSync};
269    use std::fs;
270
271    #[cfg(unix)]
272    use std::os::unix::fs::PermissionsExt;
273
274    #[derive(Default)]
275    struct RecordingSink {
276        events: Vec<ExecEvent>,
277    }
278
279    #[derive(Clone, Default)]
280    struct SharedRecordingSink {
281        events: Arc<std::sync::Mutex<Vec<ExecEvent>>>,
282    }
283
284    #[async_trait::async_trait]
285    impl ExecEventSink for RecordingSink {
286        async fn event(&mut self, event: ExecEvent) -> anyhow::Result<()> {
287            self.events.push(event);
288            Ok(())
289        }
290    }
291
292    #[async_trait::async_trait]
293    impl ExecEventSink for SharedRecordingSink {
294        async fn event(&mut self, event: ExecEvent) -> anyhow::Result<()> {
295            self.events.lock().unwrap().push(event);
296            Ok(())
297        }
298    }
299
300    #[cfg(unix)]
301    fn write_provider(contents: &str) -> tempfile::TempDir {
302        let dir = tempfile::tempdir().unwrap();
303        let provider_path = dir.path().join("provider.py");
304        fs::write(&provider_path, contents).unwrap();
305        let mut perms = fs::metadata(&provider_path).unwrap().permissions();
306        perms.set_mode(0o755);
307        fs::set_permissions(&provider_path, perms).unwrap();
308        dir
309    }
310
311    #[cfg(unix)]
312    #[tokio::test]
313    async fn sandbox_environment_adapts_file_io_and_exec() {
314        let dir = write_provider(
315            r#"#!/usr/bin/env python3
316import json, sys
317files = {}
318session = {"id": "session_1", "cwd": "/sandbox", "capabilities": {"exec": True}}
319for line in sys.stdin:
320    request = json.loads(line)
321    request_id = request["id"]
322    method = request["method"]
323    params = request.get("params", {})
324    if method == "open":
325        print(json.dumps({"id": request_id, "result": session}), flush=True)
326    elif method == "write_file":
327        files[params["path"]] = params["content_base64"]
328        print(json.dumps({"id": request_id, "result": {}}), flush=True)
329    elif method == "read_file":
330        print(json.dumps({"id": request_id, "result": {"content_base64": files[params["path"]]}}), flush=True)
331    elif method == "exec":
332        print(json.dumps({"id": request_id, "event": {"type": "started", "process_id": "p1"}}), flush=True)
333        print(json.dumps({"id": request_id, "event": {"type": "stdout", "data_base64": "AAH/"}}), flush=True)
334        print(json.dumps({"id": request_id, "event": {"type": "exited", "exit_code": 0}}), flush=True)
335        print(json.dumps({"id": request_id, "result": {"exit_code": 0, "stdout_base64": "AAH/", "stderr_base64": "/gA="}}), flush=True)
336    elif method == "spawn":
337        print(json.dumps({"id": request_id, "event": {"type": "started", "process_id": "spawn_1"}}), flush=True)
338        print(json.dumps({"id": request_id, "result": {"process_id": "spawn_1"}}), flush=True)
339    elif method == "close" or method == "shutdown":
340        print(json.dumps({"id": request_id, "result": {}}), flush=True)
341        if method == "shutdown":
342            break
343    else:
344        print(json.dumps({"id": request_id, "result": {}}), flush=True)
345"#,
346        );
347
348        let env = SandboxExecutionEnvironment::open(
349            dir.path().join("provider.py"),
350            OpenSandboxRequest {
351                metadata: Metadata::new("req_open", "sbxgrp_test"),
352                profile: ProfileRef {
353                    provider: "fake".to_string(),
354                    name: "test".to_string(),
355                },
356                workspace_sync: WorkspaceSync {
357                    host_path: dir.path().to_path_buf(),
358                },
359                cwd: None,
360            },
361        )
362        .await
363        .unwrap();
364
365        let path = EnvironmentPath("binary.bin".to_string());
366        env.write_file(&path, &[0, 1, 255]).await.unwrap();
367        assert_eq!(env.read_file(&path).await.unwrap(), vec![0, 1, 255]);
368
369        let mut sink = RecordingSink::default();
370        let output = env
371            .exec(
372                ExecRequest {
373                    argv: vec!["ignored".to_string()],
374                    ..ExecRequest::default()
375                },
376                &mut sink,
377            )
378            .await
379            .unwrap();
380        assert_eq!(output.stdout, vec![0, 1, 255]);
381        assert_eq!(output.stderr, vec![254, 0]);
382        assert!(sink.events.iter().any(
383            |event| matches!(event, ExecEvent::Stdout { chunk } if chunk == &vec![0, 1, 255])
384        ));
385
386        let spawn_sink = SharedRecordingSink::default();
387        let spawn_events = Arc::clone(&spawn_sink.events);
388        let spawned = env
389            .spawn(
390                ExecRequest {
391                    argv: vec!["ignored".to_string()],
392                    ..ExecRequest::default()
393                },
394                Some(Box::new(spawn_sink)),
395            )
396            .await
397            .unwrap();
398        assert_eq!(spawned.process_id.as_deref(), Some("spawn_1"));
399        assert!(spawn_events.lock().unwrap().iter().any(
400            |event| matches!(event, ExecEvent::Started { process_id } if process_id.as_deref() == Some("spawn_1"))
401        ));
402        env.close().await.unwrap();
403    }
404}