1use super::{
4 AgentExecutionEnvironment, EnvironmentPath, ExecEvent, ExecEventSink, ExecOutput, ExecRequest,
5 SpawnOutput,
6};
7use anyhow::Context;
8use base64::prelude::*;
9use smol_workflow_sandbox::{
10 CreateTempDirRequest, OpenSandboxRequest, SandboxExecEvent, SandboxExecRequest,
11 SandboxProviderJsonlClient, SandboxSession, SandboxSpawnRequest, SessionPathRequest,
12 WriteFileRequest,
13};
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17#[derive(Debug, Clone)]
19pub struct SandboxExecutionEnvironment {
20 client: Arc<SandboxProviderJsonlClient>,
21 session: SandboxSession,
22 cwd: Option<EnvironmentPath>,
23}
24
25impl SandboxExecutionEnvironment {
26 pub async fn open(
28 program: impl Into<std::path::PathBuf>,
29 request: OpenSandboxRequest,
30 ) -> anyhow::Result<Self> {
31 let client = Arc::new(SandboxProviderJsonlClient::start(program).await?);
32 let session = client.open(request).await?;
33 Ok(Self::from_session(client, session))
34 }
35
36 pub fn from_session(client: Arc<SandboxProviderJsonlClient>, session: SandboxSession) -> Self {
37 let cwd = session.cwd.clone().map(EnvironmentPath);
38 Self {
39 client,
40 session,
41 cwd,
42 }
43 }
44
45 pub fn client(&self) -> &Arc<SandboxProviderJsonlClient> {
46 &self.client
47 }
48
49 pub fn session(&self) -> &SandboxSession {
50 &self.session
51 }
52
53 pub async fn close(self) -> anyhow::Result<()> {
55 let close_result = self.client.close(self.session).await;
56 let shutdown_result = self.client.shutdown().await;
57 close_result?;
58 shutdown_result?;
59 Ok(())
60 }
61
62 fn session_path_request(&self, path: &EnvironmentPath) -> SessionPathRequest {
63 SessionPathRequest {
64 session: self.session.clone(),
65 path: path.0.clone(),
66 }
67 }
68}
69
70#[async_trait::async_trait]
71impl AgentExecutionEnvironment for SandboxExecutionEnvironment {
72 fn cwd(&self) -> Option<&EnvironmentPath> {
73 self.cwd.as_ref()
74 }
75
76 async fn create_dir_all(&self, path: &EnvironmentPath) -> anyhow::Result<()> {
77 self.client
78 .create_dir_all(self.session_path_request(path))
79 .await
80 .with_context(|| format!("failed to create sandbox directory `{}`", path.as_str()))
81 }
82
83 async fn write_file(&self, path: &EnvironmentPath, content: &[u8]) -> anyhow::Result<()> {
84 self.client
85 .write_file(WriteFileRequest {
86 session: self.session.clone(),
87 path: path.0.clone(),
88 content_base64: BASE64_STANDARD.encode(content),
89 })
90 .await
91 .with_context(|| format!("failed to write sandbox file `{}`", path.as_str()))
92 }
93
94 async fn read_file(&self, path: &EnvironmentPath) -> anyhow::Result<Vec<u8>> {
95 let result = self
96 .client
97 .read_file(self.session_path_request(path))
98 .await
99 .with_context(|| format!("failed to read sandbox file `{}`", path.as_str()))?;
100 BASE64_STANDARD
101 .decode(result.content_base64)
102 .context("sandbox read_file returned invalid base64 content")
103 }
104
105 async fn remove(&self, path: &EnvironmentPath) -> anyhow::Result<()> {
106 self.client
107 .remove(self.session_path_request(path))
108 .await
109 .with_context(|| format!("failed to remove sandbox path `{}`", path.as_str()))
110 }
111
112 async fn create_temp_dir(&self, prefix: &str) -> anyhow::Result<EnvironmentPath> {
113 let result = self
114 .client
115 .create_temp_dir(CreateTempDirRequest {
116 session: self.session.clone(),
117 prefix: prefix.to_string(),
118 })
119 .await
120 .context("failed to create sandbox temp directory")?;
121 Ok(EnvironmentPath(result.path))
122 }
123
124 async fn exec(
125 &self,
126 request: ExecRequest,
127 sink: &mut dyn ExecEventSink,
128 ) -> anyhow::Result<ExecOutput> {
129 let sandbox_request = SandboxExecRequest {
130 session: self.session.clone(),
131 argv: request.argv,
132 cwd: request.cwd.map(|path| path.0),
133 env: request.env,
134 stdin_base64: request.stdin.map(|stdin| BASE64_STANDARD.encode(stdin)),
135 };
136
137 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ExecEvent>();
138 let result = {
139 let exec_future = self.client.exec(sandbox_request, move |event| {
140 event_tx.send(convert_exec_event(event)?).map_err(|_| {
141 smol_workflow_sandbox::JsonlClientError::Protocol(
142 "sandbox exec event receiver closed".to_string(),
143 )
144 })?;
145 Ok(())
146 });
147 tokio::pin!(exec_future);
148
149 loop {
150 tokio::select! {
151 result = &mut exec_future => break result,
152 event = event_rx.recv() => {
153 if let Some(event) = event {
154 sink.event(event).await?;
155 }
156 }
157 }
158 }
159 }?;
160
161 while let Some(event) = event_rx.recv().await {
162 sink.event(event).await?;
163 }
164
165 Ok(ExecOutput {
166 exit_code: result.exit_code,
167 stdout: BASE64_STANDARD
168 .decode(result.stdout_base64)
169 .context("sandbox exec returned invalid base64 stdout")?,
170 stderr: BASE64_STANDARD
171 .decode(result.stderr_base64)
172 .context("sandbox exec returned invalid base64 stderr")?,
173 })
174 }
175
176 async fn spawn(
177 &self,
178 request: ExecRequest,
179 sink: Option<Box<dyn ExecEventSink>>,
180 ) -> anyhow::Result<SpawnOutput> {
181 let sandbox_request = SandboxSpawnRequest {
182 session: self.session.clone(),
183 argv: request.argv,
184 cwd: request.cwd.map(|path| path.0),
185 env: request.env,
186 stdin_base64: request.stdin.map(|stdin| BASE64_STANDARD.encode(stdin)),
187 };
188 let Some(mut sink) = sink else {
189 let result = self.client.spawn(sandbox_request).await?;
190 return Ok(SpawnOutput {
191 process_id: result.process_id,
192 });
193 };
194
195 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ExecEvent>();
196 let result: smol_workflow_sandbox::SandboxSpawnResult = {
197 let spawn_future =
198 self.client
199 .request_with_events("spawn", sandbox_request, move |event| {
200 event_tx.send(convert_exec_event(event)?).map_err(|_| {
201 smol_workflow_sandbox::JsonlClientError::Protocol(
202 "sandbox spawn event receiver closed".to_string(),
203 )
204 })?;
205 Ok(())
206 });
207 tokio::pin!(spawn_future);
208
209 loop {
210 tokio::select! {
211 result = &mut spawn_future => break result,
212 event = event_rx.recv() => {
213 if let Some(event) = event {
214 sink.event(event).await?;
215 }
216 }
217 }
218 }
219 }?;
220
221 while let Some(event) = event_rx.recv().await {
222 sink.event(event).await?;
223 }
224 Ok(SpawnOutput {
225 process_id: result.process_id,
226 })
227 }
228}
229
230fn convert_exec_event(
231 event: SandboxExecEvent,
232) -> Result<ExecEvent, smol_workflow_sandbox::JsonlClientError> {
233 match event.r#type.as_str() {
234 "started" => Ok(ExecEvent::Started {
235 process_id: event.process_id,
236 }),
237 "stdout" => Ok(ExecEvent::Stdout {
238 chunk: decode_event_data(event.data_base64)?,
239 }),
240 "stderr" => Ok(ExecEvent::Stderr {
241 chunk: decode_event_data(event.data_base64)?,
242 }),
243 "exited" => Ok(ExecEvent::Exited {
244 exit_code: event.exit_code.unwrap_or(-1),
245 }),
246 other => Err(smol_workflow_sandbox::JsonlClientError::Protocol(format!(
247 "unknown sandbox exec event type `{other}`"
248 ))),
249 }
250}
251
252fn decode_event_data(
253 data_base64: Option<String>,
254) -> Result<Vec<u8>, smol_workflow_sandbox::JsonlClientError> {
255 let Some(data_base64) = data_base64 else {
256 return Ok(Vec::new());
257 };
258 BASE64_STANDARD.decode(data_base64).map_err(|error| {
259 smol_workflow_sandbox::JsonlClientError::Protocol(format!(
260 "sandbox exec event contained invalid base64 data: {error}"
261 ))
262 })
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use smol_workflow_sandbox::{Metadata, ProfileRef, WorkspaceSync};
269 use std::fs;
270
271 #[cfg(unix)]
272 use std::os::unix::fs::PermissionsExt;
273
274 #[derive(Default)]
275 struct RecordingSink {
276 events: Vec<ExecEvent>,
277 }
278
279 #[derive(Clone, Default)]
280 struct SharedRecordingSink {
281 events: Arc<std::sync::Mutex<Vec<ExecEvent>>>,
282 }
283
284 #[async_trait::async_trait]
285 impl ExecEventSink for RecordingSink {
286 async fn event(&mut self, event: ExecEvent) -> anyhow::Result<()> {
287 self.events.push(event);
288 Ok(())
289 }
290 }
291
292 #[async_trait::async_trait]
293 impl ExecEventSink for SharedRecordingSink {
294 async fn event(&mut self, event: ExecEvent) -> anyhow::Result<()> {
295 self.events.lock().unwrap().push(event);
296 Ok(())
297 }
298 }
299
300 #[cfg(unix)]
301 fn write_provider(contents: &str) -> tempfile::TempDir {
302 let dir = tempfile::tempdir().unwrap();
303 let provider_path = dir.path().join("provider.py");
304 fs::write(&provider_path, contents).unwrap();
305 let mut perms = fs::metadata(&provider_path).unwrap().permissions();
306 perms.set_mode(0o755);
307 fs::set_permissions(&provider_path, perms).unwrap();
308 dir
309 }
310
311 #[cfg(unix)]
312 #[tokio::test]
313 async fn sandbox_environment_adapts_file_io_and_exec() {
314 let dir = write_provider(
315 r#"#!/usr/bin/env python3
316import json, sys
317files = {}
318session = {"id": "session_1", "cwd": "/sandbox", "capabilities": {"exec": True}}
319for line in sys.stdin:
320 request = json.loads(line)
321 request_id = request["id"]
322 method = request["method"]
323 params = request.get("params", {})
324 if method == "open":
325 print(json.dumps({"id": request_id, "result": session}), flush=True)
326 elif method == "write_file":
327 files[params["path"]] = params["content_base64"]
328 print(json.dumps({"id": request_id, "result": {}}), flush=True)
329 elif method == "read_file":
330 print(json.dumps({"id": request_id, "result": {"content_base64": files[params["path"]]}}), flush=True)
331 elif method == "exec":
332 print(json.dumps({"id": request_id, "event": {"type": "started", "process_id": "p1"}}), flush=True)
333 print(json.dumps({"id": request_id, "event": {"type": "stdout", "data_base64": "AAH/"}}), flush=True)
334 print(json.dumps({"id": request_id, "event": {"type": "exited", "exit_code": 0}}), flush=True)
335 print(json.dumps({"id": request_id, "result": {"exit_code": 0, "stdout_base64": "AAH/", "stderr_base64": "/gA="}}), flush=True)
336 elif method == "spawn":
337 print(json.dumps({"id": request_id, "event": {"type": "started", "process_id": "spawn_1"}}), flush=True)
338 print(json.dumps({"id": request_id, "result": {"process_id": "spawn_1"}}), flush=True)
339 elif method == "close" or method == "shutdown":
340 print(json.dumps({"id": request_id, "result": {}}), flush=True)
341 if method == "shutdown":
342 break
343 else:
344 print(json.dumps({"id": request_id, "result": {}}), flush=True)
345"#,
346 );
347
348 let env = SandboxExecutionEnvironment::open(
349 dir.path().join("provider.py"),
350 OpenSandboxRequest {
351 metadata: Metadata::new("req_open", "sbxgrp_test"),
352 profile: ProfileRef {
353 provider: "fake".to_string(),
354 name: "test".to_string(),
355 },
356 workspace_sync: WorkspaceSync {
357 host_path: dir.path().to_path_buf(),
358 },
359 cwd: None,
360 },
361 )
362 .await
363 .unwrap();
364
365 let path = EnvironmentPath("binary.bin".to_string());
366 env.write_file(&path, &[0, 1, 255]).await.unwrap();
367 assert_eq!(env.read_file(&path).await.unwrap(), vec![0, 1, 255]);
368
369 let mut sink = RecordingSink::default();
370 let output = env
371 .exec(
372 ExecRequest {
373 argv: vec!["ignored".to_string()],
374 ..ExecRequest::default()
375 },
376 &mut sink,
377 )
378 .await
379 .unwrap();
380 assert_eq!(output.stdout, vec![0, 1, 255]);
381 assert_eq!(output.stderr, vec![254, 0]);
382 assert!(sink.events.iter().any(
383 |event| matches!(event, ExecEvent::Stdout { chunk } if chunk == &vec![0, 1, 255])
384 ));
385
386 let spawn_sink = SharedRecordingSink::default();
387 let spawn_events = Arc::clone(&spawn_sink.events);
388 let spawned = env
389 .spawn(
390 ExecRequest {
391 argv: vec!["ignored".to_string()],
392 ..ExecRequest::default()
393 },
394 Some(Box::new(spawn_sink)),
395 )
396 .await
397 .unwrap();
398 assert_eq!(spawned.process_id.as_deref(), Some("spawn_1"));
399 assert!(spawn_events.lock().unwrap().iter().any(
400 |event| matches!(event, ExecEvent::Started { process_id } if process_id.as_deref() == Some("spawn_1"))
401 ));
402 env.close().await.unwrap();
403 }
404}