Skip to main content

routa_core/sandbox/
manager.rs

1//! Sandbox manager for Docker-based code execution sandboxes.
2//!
3//! Manages the lifecycle of sandbox containers: creation, tracking, idle-timeout
4//! cleanup, and routing execution requests to the in-sandbox server.
5//!
6//! Architecture mirrors the Python sandbox manager described in:
7//! https://amirmalik.net/2025/03/07/code-sandboxes-for-llm-ai-agents
8
9use std::collections::{BTreeMap, HashMap, HashSet};
10use std::path::Path;
11use std::process::Stdio;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use chrono::Utc;
16use serde_json::Value;
17use tokio::process::Command;
18use tokio::sync::RwLock;
19
20use crate::acp::docker::find_available_port;
21
22use super::env::parse_env_file;
23use super::policy::{ResolvedSandboxPolicy, SandboxEnvMode, SandboxNetworkMode};
24use super::types::{
25    ExecuteRequest, ResolvedCreateSandboxRequest, SandboxInfo, SANDBOX_CHECK_INTERVAL_SECS,
26    SANDBOX_CONTAINER_PORT, SANDBOX_IDLE_TIMEOUT_SECS, SANDBOX_IMAGE, SANDBOX_LABEL,
27};
28
29/// Manages Docker-based code execution sandboxes.
30///
31/// Each sandbox is a Docker container running the in-sandbox FastAPI/Jupyter
32/// server. The manager handles container lifecycle and proxies execution
33/// requests to the appropriate container.
34pub struct SandboxManager {
35    /// Maps container ID → `SandboxInfo`.
36    sandboxes: Arc<RwLock<HashMap<String, SandboxInfo>>>,
37    /// Maps container ID → last activity `Instant` (for idle timeout).
38    last_active: Arc<RwLock<HashMap<String, Instant>>>,
39    /// Currently allocated host ports (to avoid reuse).
40    used_ports: Arc<RwLock<HashSet<u16>>>,
41    /// Shared HTTP client for proxying execution requests.
42    http_client: reqwest::Client,
43}
44
45impl Default for SandboxManager {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl SandboxManager {
52    /// Create a new `SandboxManager` and spawn the background idle-cleanup task.
53    pub fn new() -> Self {
54        let sandboxes = Arc::new(RwLock::new(HashMap::new()));
55        let last_active = Arc::new(RwLock::new(HashMap::new()));
56        let used_ports = Arc::new(RwLock::new(HashSet::new()));
57
58        let mgr = Self {
59            sandboxes,
60            last_active,
61            used_ports,
62            http_client: reqwest::Client::new(),
63        };
64
65        // Spawn background task to terminate idle sandboxes.
66        mgr.spawn_idle_cleanup();
67
68        mgr
69    }
70
71    /// Spawn a Tokio task that periodically terminates idle sandboxes.
72    fn spawn_idle_cleanup(&self) {
73        let sandboxes = self.sandboxes.clone();
74        let last_active = self.last_active.clone();
75        let used_ports = self.used_ports.clone();
76
77        tokio::spawn(async move {
78            let mut interval =
79                tokio::time::interval(Duration::from_secs(SANDBOX_CHECK_INTERVAL_SECS));
80            loop {
81                interval.tick().await;
82                let now = Instant::now();
83                let ids: Vec<String> = last_active.read().await.keys().cloned().collect();
84
85                for id in ids {
86                    let last = last_active.read().await.get(&id).copied();
87                    match last {
88                        None => {
89                            // Unknown/untracked container — skip
90                        }
91                        Some(t) if now.duration_since(t).as_secs() < SANDBOX_IDLE_TIMEOUT_SECS => {
92                            // Still active
93                        }
94                        _ => {
95                            tracing::info!(
96                                "[SandboxManager] Terminating idle sandbox {}",
97                                &id[..8.min(id.len())]
98                            );
99                            // Best-effort stop+remove
100                            let _ = stop_container(&id).await;
101                            let mut sandboxes = sandboxes.write().await;
102                            if let Some(info) = sandboxes.remove(&id) {
103                                if let Some(port) = info.port {
104                                    used_ports.write().await.remove(&port);
105                                }
106                            }
107                            last_active.write().await.remove(&id);
108                        }
109                    }
110                }
111            }
112        });
113    }
114
115    // ── Public API ───────────────────────────────────────────────────────────
116
117    /// List all tracked sandbox containers.
118    pub async fn list_sandboxes(&self) -> Vec<SandboxInfo> {
119        self.sandboxes.read().await.values().cloned().collect()
120    }
121
122    /// Get a sandbox by container ID (or unique prefix).
123    pub async fn get_sandbox(&self, id: &str) -> Option<SandboxInfo> {
124        let sandboxes = self.sandboxes.read().await;
125
126        // Exact match first
127        if let Some(info) = sandboxes.get(id) {
128            return Some(info.clone());
129        }
130
131        // Prefix match (Docker short IDs)
132        sandboxes
133            .values()
134            .find(|info| info.id.starts_with(id))
135            .cloned()
136    }
137
138    /// Create a new sandbox container and return its info.
139    pub async fn create_sandbox(
140        &self,
141        req: ResolvedCreateSandboxRequest,
142    ) -> Result<SandboxInfo, String> {
143        let lang = req.lang.to_lowercase();
144        if lang != "python" {
145            return Err("Only Python sandboxes are supported.".to_string());
146        }
147
148        // Choose an available host port.
149        let host_port = {
150            let used = self.used_ports.read().await.clone();
151            find_available_port(&used).await?
152        };
153        self.used_ports.write().await.insert(host_port);
154
155        // Unique container name with timestamp.
156        let short_id = &uuid::Uuid::new_v4().to_string()[..8];
157        let container_name = format!("routa-sandbox-{short_id}");
158
159        // Build `docker run` command.
160        let docker_args =
161            build_docker_run_args(&container_name, host_port, &lang, req.policy.as_ref())?;
162        let output = Command::new("docker")
163            .args(&docker_args)
164            .stdout(Stdio::piped())
165            .stderr(Stdio::piped())
166            .output()
167            .await
168            .map_err(|e| format!("Failed to run docker: {e}"))?;
169
170        if !output.status.success() {
171            // Reclaim port on failure.
172            self.used_ports.write().await.remove(&host_port);
173            let stderr = String::from_utf8_lossy(&output.stderr);
174            return Err(format!("docker run failed: {stderr}"));
175        }
176
177        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
178
179        let now = Utc::now();
180        let info = SandboxInfo {
181            id: container_id.clone(),
182            name: container_name,
183            status: "running".to_string(),
184            lang,
185            port: Some(host_port),
186            effective_policy: req.policy,
187            created_at: now,
188            last_active_at: now,
189        };
190
191        self.sandboxes
192            .write()
193            .await
194            .insert(container_id.clone(), info.clone());
195        self.last_active
196            .write()
197            .await
198            .insert(container_id, Instant::now());
199
200        Ok(info)
201    }
202
203    /// Execute code inside a sandbox and return a streaming reqwest response.
204    ///
205    /// The response body is an NDJSON stream of `SandboxOutputEvent` objects
206    /// as produced by the in-sandbox FastAPI server.
207    pub async fn execute_in_sandbox(
208        &self,
209        id: &str,
210        req: ExecuteRequest,
211    ) -> Result<reqwest::Response, String> {
212        if req.code.trim().is_empty() {
213            return Err("Code cannot be empty.".to_string());
214        }
215
216        let info = self
217            .get_sandbox(id)
218            .await
219            .ok_or_else(|| format!("Sandbox not found: {id}"))?;
220
221        let port = info
222            .port
223            .ok_or_else(|| "Sandbox has no exposed port".to_string())?;
224
225        let sandbox_url = format!("http://127.0.0.1:{port}/execute");
226
227        let response = self
228            .http_client
229            .post(&sandbox_url)
230            .json(&req)
231            .send()
232            .await
233            .map_err(|e| format!("Failed to reach sandbox: {e}"))?;
234
235        if !response.status().is_success() {
236            return Err(format!(
237                "Sandbox execution failed with status {}",
238                response.status()
239            ));
240        }
241
242        // Update last_active timestamp.
243        self.last_active
244            .write()
245            .await
246            .insert(id.to_string(), Instant::now());
247        if let Some(stored) = self.sandboxes.write().await.get_mut(&info.id) {
248            stored.last_active_at = Utc::now();
249        }
250
251        Ok(response)
252    }
253
254    /// Stop and remove a sandbox container.
255    pub async fn delete_sandbox(&self, id: &str) -> Result<(), String> {
256        let info = self
257            .get_sandbox(id)
258            .await
259            .ok_or_else(|| format!("Sandbox not found: {id}"))?;
260
261        stop_container(&info.id).await?;
262
263        let mut sandboxes = self.sandboxes.write().await;
264        sandboxes.remove(&info.id);
265        if let Some(port) = info.port {
266            self.used_ports.write().await.remove(&port);
267        }
268        self.last_active.write().await.remove(&info.id);
269
270        Ok(())
271    }
272
273    /// Replace an existing sandbox with a newly created sandbox using a mutated policy.
274    pub async fn recreate_sandbox(
275        &self,
276        id: &str,
277        req: ResolvedCreateSandboxRequest,
278    ) -> Result<SandboxInfo, String> {
279        self.get_sandbox(id)
280            .await
281            .ok_or_else(|| format!("Sandbox not found: {id}"))?;
282
283        let next = self.create_sandbox(req).await?;
284        if let Err(err) = self.delete_sandbox(id).await {
285            let _ = self.delete_sandbox(&next.id).await;
286            return Err(format!("Failed to replace sandbox: {err}"));
287        }
288
289        Ok(next)
290    }
291}
292
293// ── Helpers ──────────────────────────────────────────────────────────────────
294
295/// Stop and remove a Docker container by ID.
296async fn stop_container(container_id: &str) -> Result<(), String> {
297    // Stop
298    let stop = Command::new("docker")
299        .args(["stop", container_id])
300        .stdout(Stdio::null())
301        .stderr(Stdio::null())
302        .output()
303        .await
304        .map_err(|e| format!("docker stop failed: {e}"))?;
305
306    if !stop.status.success() {
307        tracing::warn!(
308            "[SandboxManager] docker stop {} failed (container may already be gone)",
309            &container_id[..8.min(container_id.len())]
310        );
311    }
312
313    // Remove (the --rm flag in `docker run` also handles this, but be explicit)
314    let _ = Command::new("docker")
315        .args(["rm", "-f", container_id])
316        .stdout(Stdio::null())
317        .stderr(Stdio::null())
318        .output()
319        .await;
320
321    Ok(())
322}
323
324fn build_docker_run_args(
325    container_name: &str,
326    host_port: u16,
327    lang: &str,
328    policy: Option<&ResolvedSandboxPolicy>,
329) -> Result<Vec<String>, String> {
330    let mut args = vec![
331        "run".to_string(),
332        "-d".to_string(),
333        "--rm".to_string(),
334        format!("--name={}", container_name),
335        format!("-p={}:{}", host_port, SANDBOX_CONTAINER_PORT),
336        format!("--label={}=1", SANDBOX_LABEL),
337        format!("--label={}.lang={}", SANDBOX_LABEL, lang),
338        "--memory=512m".to_string(),
339        "--cpus=1".to_string(),
340        "--pids-limit=64".to_string(),
341    ];
342
343    if let Some(policy) = policy {
344        if let Some(workspace_id) = &policy.workspace_id {
345            args.push(format!(
346                "--label={SANDBOX_LABEL}.workspace_id={workspace_id}"
347            ));
348        }
349        if let Some(codebase_id) = &policy.codebase_id {
350            args.push(format!("--label={SANDBOX_LABEL}.codebase_id={codebase_id}"));
351        }
352        args.push(format!(
353            "--label={}.network_mode={}",
354            SANDBOX_LABEL,
355            match policy.network_mode {
356                SandboxNetworkMode::Bridge => "bridge",
357                SandboxNetworkMode::None => "none",
358            }
359        ));
360
361        for mount in &policy.mounts {
362            args.push("-v".to_string());
363            args.push(format!(
364                "{}:{}:{}",
365                mount.host_path,
366                mount.container_path,
367                mount.access.docker_suffix()
368            ));
369        }
370
371        args.push("-w".to_string());
372        args.push(policy.container_workdir.clone());
373
374        for (key, value) in collect_policy_env(policy)? {
375            args.push("-e".to_string());
376            args.push(format!("{key}={value}"));
377        }
378
379        args.push("--network".to_string());
380        args.push(
381            match policy.network_mode {
382                SandboxNetworkMode::Bridge => "bridge",
383                SandboxNetworkMode::None => "none",
384            }
385            .to_string(),
386        );
387    } else {
388        args.push("--network=bridge".to_string());
389    }
390
391    args.push(SANDBOX_IMAGE.to_string());
392    Ok(args)
393}
394
395fn collect_policy_env(policy: &ResolvedSandboxPolicy) -> Result<Vec<(String, String)>, String> {
396    let mut env_pairs = BTreeMap::new();
397
398    for env_file in &policy.env_files {
399        for (key, value) in parse_env_file(Path::new(&env_file.path))? {
400            env_pairs.insert(key, value);
401        }
402    }
403
404    let passthrough = match policy.env_mode {
405        SandboxEnvMode::Sanitized => policy
406            .env_allowlist
407            .iter()
408            .filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value)))
409            .collect::<Vec<_>>(),
410        SandboxEnvMode::Inherit => std::env::vars().collect::<Vec<_>>(),
411    };
412
413    for (key, value) in passthrough {
414        env_pairs.insert(key, value);
415    }
416
417    Ok(env_pairs.into_iter().collect())
418}
419
420/// Parse container port from `docker inspect` JSON output.
421#[allow(dead_code)]
422async fn get_container_port(container_id: &str, container_port: u16) -> Option<u16> {
423    let output = Command::new("docker")
424        .args([
425            "inspect",
426            "--format",
427            "{{json .NetworkSettings.Ports}}",
428            container_id,
429        ])
430        .stdout(Stdio::piped())
431        .stderr(Stdio::null())
432        .output()
433        .await
434        .ok()?;
435
436    if !output.status.success() {
437        return None;
438    }
439
440    let stdout = String::from_utf8_lossy(&output.stdout);
441    let ports: Value = serde_json::from_str(stdout.trim()).ok()?;
442    let key = format!("{container_port}/tcp");
443    let mappings = ports.get(&key)?.as_array()?;
444    let host_port = mappings.first()?.get("HostPort")?.as_str()?;
445    host_port.parse().ok()
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use crate::sandbox::{
452        ResolvedSandboxEnvFile, ResolvedSandboxPolicy, SandboxEnvFileSource, SandboxEnvMode,
453        SandboxMount, SandboxMountAccess, SandboxNetworkMode,
454    };
455
456    #[test]
457    fn sandbox_manager_creates_default() {
458        // SandboxManager::new() spawns a Tokio task internally, so we need a runtime.
459        let rt = tokio::runtime::Runtime::new().unwrap();
460        rt.block_on(async {
461            let _mgr = SandboxManager::new();
462            // Default state has no sandboxes.
463        });
464    }
465
466    #[tokio::test]
467    async fn list_sandboxes_empty_by_default() {
468        let mgr = SandboxManager::new();
469        let list = mgr.list_sandboxes().await;
470        assert!(list.is_empty());
471    }
472
473    #[tokio::test]
474    async fn get_sandbox_returns_none_for_unknown_id() {
475        let mgr = SandboxManager::new();
476        assert!(mgr.get_sandbox("nonexistent-id").await.is_none());
477    }
478
479    #[tokio::test]
480    async fn create_sandbox_rejects_unsupported_lang() {
481        let mgr = SandboxManager::new();
482        let err = mgr
483            .create_sandbox(ResolvedCreateSandboxRequest {
484                lang: "ruby".to_string(),
485                policy: None,
486            })
487            .await
488            .unwrap_err();
489        assert!(err.contains("Only Python"));
490    }
491
492    #[tokio::test]
493    async fn execute_rejects_empty_code() {
494        let mgr = SandboxManager::new();
495        // Inject a fake sandbox so we get past the lookup.
496        {
497            let now = Utc::now();
498            let info = SandboxInfo {
499                id: "fake-id".to_string(),
500                name: "routa-sandbox-fake".to_string(),
501                status: "running".to_string(),
502                lang: "python".to_string(),
503                port: Some(19999),
504                effective_policy: None,
505                created_at: now,
506                last_active_at: now,
507            };
508            mgr.sandboxes
509                .write()
510                .await
511                .insert("fake-id".to_string(), info);
512        }
513        let err = mgr
514            .execute_in_sandbox(
515                "fake-id",
516                ExecuteRequest {
517                    code: "   ".to_string(),
518                },
519            )
520            .await
521            .unwrap_err();
522        assert!(err.contains("empty"));
523    }
524
525    #[tokio::test]
526    async fn delete_sandbox_returns_err_for_unknown() {
527        let mgr = SandboxManager::new();
528        assert!(mgr.delete_sandbox("nonexistent").await.is_err());
529    }
530
531    #[test]
532    fn docker_args_include_policy_mounts_and_workdir() {
533        let policy = ResolvedSandboxPolicy {
534            workspace_id: Some("ws-1".to_string()),
535            codebase_id: Some("cb-1".to_string()),
536            scope_root: "/repo".to_string(),
537            host_workdir: "/repo/src".to_string(),
538            container_workdir: "/workspace/src".to_string(),
539            read_only_paths: vec![],
540            read_write_paths: vec!["/repo/src".to_string()],
541            network_mode: SandboxNetworkMode::None,
542            env_mode: SandboxEnvMode::Sanitized,
543            env_files: vec![],
544            env_allowlist: vec![],
545            mounts: vec![
546                SandboxMount {
547                    host_path: "/repo".to_string(),
548                    container_path: "/workspace".to_string(),
549                    access: SandboxMountAccess::ReadOnly,
550                    reason: Some("scopeRoot".to_string()),
551                },
552                SandboxMount {
553                    host_path: "/repo/src".to_string(),
554                    container_path: "/workspace/src".to_string(),
555                    access: SandboxMountAccess::ReadWrite,
556                    reason: Some("scopeOverride".to_string()),
557                },
558            ],
559            capabilities: vec![],
560            linked_worktrees: vec![],
561            workspace_config: None,
562            notes: vec![],
563        };
564
565        let args = build_docker_run_args("sandbox-name", 12345, "python", Some(&policy))
566            .expect("docker args should build");
567        assert!(args.iter().any(|arg| arg == "--network"));
568        assert!(args.iter().any(|arg| arg == "none"));
569        assert!(args.iter().any(|arg| arg == "/workspace/src"));
570        assert!(args
571            .iter()
572            .any(|arg| arg == "/repo:/workspace:ro" || arg == "/repo/src:/workspace/src:rw"));
573    }
574
575    #[test]
576    fn collect_policy_env_layers_env_files_before_host_env() {
577        let dir = tempfile::tempdir().expect("tempdir should exist");
578        let base_env = dir.path().join(".env.base");
579        let request_env = dir.path().join(".env.request");
580        std::fs::write(&base_env, "SHARED=base\nBASE_ONLY=1\n").expect("base env should exist");
581        std::fs::write(&request_env, "SHARED=request\nREQUEST_ONLY=1\n")
582            .expect("request env should exist");
583        std::env::set_var("SHARED", "host");
584        std::env::set_var("HOST_ONLY", "1");
585
586        let policy = ResolvedSandboxPolicy {
587            workspace_id: None,
588            codebase_id: None,
589            scope_root: dir.path().to_string_lossy().to_string(),
590            host_workdir: dir.path().to_string_lossy().to_string(),
591            container_workdir: "/workspace".to_string(),
592            read_only_paths: vec![],
593            read_write_paths: vec![],
594            network_mode: SandboxNetworkMode::None,
595            env_mode: SandboxEnvMode::Sanitized,
596            env_files: vec![
597                ResolvedSandboxEnvFile {
598                    path: base_env.to_string_lossy().to_string(),
599                    source: SandboxEnvFileSource::WorkspaceConfig,
600                    keys: vec!["BASE_ONLY".to_string(), "SHARED".to_string()],
601                },
602                ResolvedSandboxEnvFile {
603                    path: request_env.to_string_lossy().to_string(),
604                    source: SandboxEnvFileSource::Request,
605                    keys: vec!["REQUEST_ONLY".to_string(), "SHARED".to_string()],
606                },
607            ],
608            env_allowlist: vec!["HOST_ONLY".to_string(), "SHARED".to_string()],
609            mounts: vec![],
610            capabilities: vec![],
611            linked_worktrees: vec![],
612            workspace_config: None,
613            notes: vec![],
614        };
615
616        let env_pairs = collect_policy_env(&policy).expect("env layering should succeed");
617        let env_map = env_pairs.into_iter().collect::<BTreeMap<_, _>>();
618        assert_eq!(env_map.get("BASE_ONLY").map(String::as_str), Some("1"));
619        assert_eq!(env_map.get("REQUEST_ONLY").map(String::as_str), Some("1"));
620        assert_eq!(env_map.get("HOST_ONLY").map(String::as_str), Some("1"));
621        assert_eq!(env_map.get("SHARED").map(String::as_str), Some("host"));
622
623        std::env::remove_var("SHARED");
624        std::env::remove_var("HOST_ONLY");
625    }
626}