Skip to main content

routa_core/sandbox/
manager.rs

1//! Sandbox manager for Docker-based code execution sandboxes.
2//!
3//! Manages the lifecycle of sandbox containers: creation, tracking, idle-timeout
4//! cleanup, and routing execution requests to the in-sandbox server.
5//!
6//! Architecture mirrors the Python sandbox manager described in:
7//! https://amirmalik.net/2025/03/07/code-sandboxes-for-llm-ai-agents
8
9use std::collections::{BTreeMap, HashMap, HashSet};
10use std::path::Path;
11use std::process::Stdio;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use chrono::Utc;
16use serde_json::Value;
17use tokio::process::Command;
18use tokio::sync::RwLock;
19
20use crate::acp::docker::find_available_port;
21
22use super::env::parse_env_file;
23use super::policy::{ResolvedSandboxPolicy, SandboxEnvMode, SandboxNetworkMode};
24use super::types::{
25    ExecuteRequest, ResolvedCreateSandboxRequest, SandboxInfo, SANDBOX_CHECK_INTERVAL_SECS,
26    SANDBOX_CONTAINER_PORT, SANDBOX_IDLE_TIMEOUT_SECS, SANDBOX_IMAGE, SANDBOX_LABEL,
27};
28
29/// Manages Docker-based code execution sandboxes.
30///
31/// Each sandbox is a Docker container running the in-sandbox FastAPI/Jupyter
32/// server. The manager handles container lifecycle and proxies execution
33/// requests to the appropriate container.
34pub struct SandboxManager {
35    /// Maps container ID → `SandboxInfo`.
36    sandboxes: Arc<RwLock<HashMap<String, SandboxInfo>>>,
37    /// Maps container ID → last activity `Instant` (for idle timeout).
38    last_active: Arc<RwLock<HashMap<String, Instant>>>,
39    /// Currently allocated host ports (to avoid reuse).
40    used_ports: Arc<RwLock<HashSet<u16>>>,
41    /// Shared HTTP client for proxying execution requests.
42    http_client: reqwest::Client,
43}
44
45impl Default for SandboxManager {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl SandboxManager {
52    /// Create a new `SandboxManager` and spawn the background idle-cleanup task.
53    pub fn new() -> Self {
54        let sandboxes = Arc::new(RwLock::new(HashMap::new()));
55        let last_active = Arc::new(RwLock::new(HashMap::new()));
56        let used_ports = Arc::new(RwLock::new(HashSet::new()));
57
58        let mgr = Self {
59            sandboxes,
60            last_active,
61            used_ports,
62            http_client: reqwest::Client::new(),
63        };
64
65        // Spawn background task to terminate idle sandboxes.
66        mgr.spawn_idle_cleanup();
67
68        mgr
69    }
70
71    /// Spawn a Tokio task that periodically terminates idle sandboxes.
72    fn spawn_idle_cleanup(&self) {
73        let sandboxes = self.sandboxes.clone();
74        let last_active = self.last_active.clone();
75        let used_ports = self.used_ports.clone();
76
77        tokio::spawn(async move {
78            let mut interval =
79                tokio::time::interval(Duration::from_secs(SANDBOX_CHECK_INTERVAL_SECS));
80            loop {
81                interval.tick().await;
82                let now = Instant::now();
83                let ids: Vec<String> = last_active.read().await.keys().cloned().collect();
84
85                for id in ids {
86                    let last = last_active.read().await.get(&id).copied();
87                    match last {
88                        None => {
89                            // Unknown/untracked container — skip
90                        }
91                        Some(t) if now.duration_since(t).as_secs() < SANDBOX_IDLE_TIMEOUT_SECS => {
92                            // Still active
93                        }
94                        _ => {
95                            tracing::info!(
96                                "[SandboxManager] Terminating idle sandbox {}",
97                                &id[..8.min(id.len())]
98                            );
99                            // Best-effort stop+remove
100                            let _ = stop_container(&id).await;
101                            let mut sandboxes = sandboxes.write().await;
102                            if let Some(info) = sandboxes.remove(&id) {
103                                if let Some(port) = info.port {
104                                    used_ports.write().await.remove(&port);
105                                }
106                            }
107                            last_active.write().await.remove(&id);
108                        }
109                    }
110                }
111            }
112        });
113    }
114
115    // ── Public API ───────────────────────────────────────────────────────────
116
117    /// List all tracked sandbox containers.
118    pub async fn list_sandboxes(&self) -> Vec<SandboxInfo> {
119        self.sandboxes.read().await.values().cloned().collect()
120    }
121
122    /// Get a sandbox by container ID (or unique prefix).
123    pub async fn get_sandbox(&self, id: &str) -> Option<SandboxInfo> {
124        let sandboxes = self.sandboxes.read().await;
125
126        // Exact match first
127        if let Some(info) = sandboxes.get(id) {
128            return Some(info.clone());
129        }
130
131        // Prefix match (Docker short IDs)
132        sandboxes
133            .values()
134            .find(|info| info.id.starts_with(id))
135            .cloned()
136    }
137
138    /// Create a new sandbox container and return its info.
139    pub async fn create_sandbox(
140        &self,
141        req: ResolvedCreateSandboxRequest,
142    ) -> Result<SandboxInfo, String> {
143        let lang = req.lang.to_lowercase();
144        if lang != "python" {
145            return Err("Only Python sandboxes are supported.".to_string());
146        }
147
148        // Choose an available host port.
149        let host_port = {
150            let used = self.used_ports.read().await.clone();
151            find_available_port(&used).await?
152        };
153        self.used_ports.write().await.insert(host_port);
154
155        // Unique container name with timestamp.
156        let short_id = &uuid::Uuid::new_v4().to_string()[..8];
157        let container_name = format!("routa-sandbox-{}", short_id);
158
159        // Build `docker run` command.
160        let docker_args =
161            build_docker_run_args(&container_name, host_port, &lang, req.policy.as_ref())?;
162        let output = Command::new("docker")
163            .args(&docker_args)
164            .stdout(Stdio::piped())
165            .stderr(Stdio::piped())
166            .output()
167            .await
168            .map_err(|e| format!("Failed to run docker: {e}"))?;
169
170        if !output.status.success() {
171            // Reclaim port on failure.
172            self.used_ports.write().await.remove(&host_port);
173            let stderr = String::from_utf8_lossy(&output.stderr);
174            return Err(format!("docker run failed: {stderr}"));
175        }
176
177        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
178
179        let now = Utc::now();
180        let info = SandboxInfo {
181            id: container_id.clone(),
182            name: container_name,
183            status: "running".to_string(),
184            lang,
185            port: Some(host_port),
186            effective_policy: req.policy,
187            created_at: now,
188            last_active_at: now,
189        };
190
191        self.sandboxes
192            .write()
193            .await
194            .insert(container_id.clone(), info.clone());
195        self.last_active
196            .write()
197            .await
198            .insert(container_id, Instant::now());
199
200        Ok(info)
201    }
202
203    /// Execute code inside a sandbox and return a streaming reqwest response.
204    ///
205    /// The response body is an NDJSON stream of `SandboxOutputEvent` objects
206    /// as produced by the in-sandbox FastAPI server.
207    pub async fn execute_in_sandbox(
208        &self,
209        id: &str,
210        req: ExecuteRequest,
211    ) -> Result<reqwest::Response, String> {
212        if req.code.trim().is_empty() {
213            return Err("Code cannot be empty.".to_string());
214        }
215
216        let info = self
217            .get_sandbox(id)
218            .await
219            .ok_or_else(|| format!("Sandbox not found: {id}"))?;
220
221        let port = info
222            .port
223            .ok_or_else(|| "Sandbox has no exposed port".to_string())?;
224
225        let sandbox_url = format!("http://127.0.0.1:{}/execute", port);
226
227        let response = self
228            .http_client
229            .post(&sandbox_url)
230            .json(&req)
231            .send()
232            .await
233            .map_err(|e| format!("Failed to reach sandbox: {e}"))?;
234
235        if !response.status().is_success() {
236            return Err(format!(
237                "Sandbox execution failed with status {}",
238                response.status()
239            ));
240        }
241
242        // Update last_active timestamp.
243        self.last_active
244            .write()
245            .await
246            .insert(id.to_string(), Instant::now());
247        if let Some(stored) = self.sandboxes.write().await.get_mut(&info.id) {
248            stored.last_active_at = Utc::now();
249        }
250
251        Ok(response)
252    }
253
254    /// Stop and remove a sandbox container.
255    pub async fn delete_sandbox(&self, id: &str) -> Result<(), String> {
256        let info = self
257            .get_sandbox(id)
258            .await
259            .ok_or_else(|| format!("Sandbox not found: {id}"))?;
260
261        stop_container(&info.id).await?;
262
263        let mut sandboxes = self.sandboxes.write().await;
264        sandboxes.remove(&info.id);
265        if let Some(port) = info.port {
266            self.used_ports.write().await.remove(&port);
267        }
268        self.last_active.write().await.remove(&info.id);
269
270        Ok(())
271    }
272
273    /// Replace an existing sandbox with a newly created sandbox using a mutated policy.
274    pub async fn recreate_sandbox(
275        &self,
276        id: &str,
277        req: ResolvedCreateSandboxRequest,
278    ) -> Result<SandboxInfo, String> {
279        self.get_sandbox(id)
280            .await
281            .ok_or_else(|| format!("Sandbox not found: {id}"))?;
282
283        let next = self.create_sandbox(req).await?;
284        if let Err(err) = self.delete_sandbox(id).await {
285            let _ = self.delete_sandbox(&next.id).await;
286            return Err(format!("Failed to replace sandbox: {err}"));
287        }
288
289        Ok(next)
290    }
291}
292
293// ── Helpers ──────────────────────────────────────────────────────────────────
294
295/// Stop and remove a Docker container by ID.
296async fn stop_container(container_id: &str) -> Result<(), String> {
297    // Stop
298    let stop = Command::new("docker")
299        .args(["stop", container_id])
300        .stdout(Stdio::null())
301        .stderr(Stdio::null())
302        .output()
303        .await
304        .map_err(|e| format!("docker stop failed: {e}"))?;
305
306    if !stop.status.success() {
307        tracing::warn!(
308            "[SandboxManager] docker stop {} failed (container may already be gone)",
309            &container_id[..8.min(container_id.len())]
310        );
311    }
312
313    // Remove (the --rm flag in `docker run` also handles this, but be explicit)
314    let _ = Command::new("docker")
315        .args(["rm", "-f", container_id])
316        .stdout(Stdio::null())
317        .stderr(Stdio::null())
318        .output()
319        .await;
320
321    Ok(())
322}
323
324fn build_docker_run_args(
325    container_name: &str,
326    host_port: u16,
327    lang: &str,
328    policy: Option<&ResolvedSandboxPolicy>,
329) -> Result<Vec<String>, String> {
330    let mut args = vec![
331        "run".to_string(),
332        "-d".to_string(),
333        "--rm".to_string(),
334        format!("--name={}", container_name),
335        format!("-p={}:{}", host_port, SANDBOX_CONTAINER_PORT),
336        format!("--label={}=1", SANDBOX_LABEL),
337        format!("--label={}.lang={}", SANDBOX_LABEL, lang),
338        "--memory=512m".to_string(),
339        "--cpus=1".to_string(),
340        "--pids-limit=64".to_string(),
341    ];
342
343    if let Some(policy) = policy {
344        if let Some(workspace_id) = &policy.workspace_id {
345            args.push(format!(
346                "--label={}.workspace_id={}",
347                SANDBOX_LABEL, workspace_id
348            ));
349        }
350        if let Some(codebase_id) = &policy.codebase_id {
351            args.push(format!(
352                "--label={}.codebase_id={}",
353                SANDBOX_LABEL, codebase_id
354            ));
355        }
356        args.push(format!(
357            "--label={}.network_mode={}",
358            SANDBOX_LABEL,
359            match policy.network_mode {
360                SandboxNetworkMode::Bridge => "bridge",
361                SandboxNetworkMode::None => "none",
362            }
363        ));
364
365        for mount in &policy.mounts {
366            args.push("-v".to_string());
367            args.push(format!(
368                "{}:{}:{}",
369                mount.host_path,
370                mount.container_path,
371                mount.access.docker_suffix()
372            ));
373        }
374
375        args.push("-w".to_string());
376        args.push(policy.container_workdir.clone());
377
378        for (key, value) in collect_policy_env(policy)? {
379            args.push("-e".to_string());
380            args.push(format!("{}={}", key, value));
381        }
382
383        args.push("--network".to_string());
384        args.push(
385            match policy.network_mode {
386                SandboxNetworkMode::Bridge => "bridge",
387                SandboxNetworkMode::None => "none",
388            }
389            .to_string(),
390        );
391    } else {
392        args.push("--network=bridge".to_string());
393    }
394
395    args.push(SANDBOX_IMAGE.to_string());
396    Ok(args)
397}
398
399fn collect_policy_env(policy: &ResolvedSandboxPolicy) -> Result<Vec<(String, String)>, String> {
400    let mut env_pairs = BTreeMap::new();
401
402    for env_file in &policy.env_files {
403        for (key, value) in parse_env_file(Path::new(&env_file.path))? {
404            env_pairs.insert(key, value);
405        }
406    }
407
408    let passthrough = match policy.env_mode {
409        SandboxEnvMode::Sanitized => policy
410            .env_allowlist
411            .iter()
412            .filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value)))
413            .collect::<Vec<_>>(),
414        SandboxEnvMode::Inherit => std::env::vars().collect::<Vec<_>>(),
415    };
416
417    for (key, value) in passthrough {
418        env_pairs.insert(key, value);
419    }
420
421    Ok(env_pairs.into_iter().collect())
422}
423
424/// Parse container port from `docker inspect` JSON output.
425#[allow(dead_code)]
426async fn get_container_port(container_id: &str, container_port: u16) -> Option<u16> {
427    let output = Command::new("docker")
428        .args([
429            "inspect",
430            "--format",
431            "{{json .NetworkSettings.Ports}}",
432            container_id,
433        ])
434        .stdout(Stdio::piped())
435        .stderr(Stdio::null())
436        .output()
437        .await
438        .ok()?;
439
440    if !output.status.success() {
441        return None;
442    }
443
444    let stdout = String::from_utf8_lossy(&output.stdout);
445    let ports: Value = serde_json::from_str(stdout.trim()).ok()?;
446    let key = format!("{}/tcp", container_port);
447    let mappings = ports.get(&key)?.as_array()?;
448    let host_port = mappings.first()?.get("HostPort")?.as_str()?;
449    host_port.parse().ok()
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::sandbox::{
456        ResolvedSandboxEnvFile, ResolvedSandboxPolicy, SandboxEnvFileSource, SandboxEnvMode,
457        SandboxMount, SandboxMountAccess, SandboxNetworkMode,
458    };
459
460    #[test]
461    fn sandbox_manager_creates_default() {
462        // SandboxManager::new() spawns a Tokio task internally, so we need a runtime.
463        let rt = tokio::runtime::Runtime::new().unwrap();
464        rt.block_on(async {
465            let _mgr = SandboxManager::new();
466            // Default state has no sandboxes.
467        });
468    }
469
470    #[tokio::test]
471    async fn list_sandboxes_empty_by_default() {
472        let mgr = SandboxManager::new();
473        let list = mgr.list_sandboxes().await;
474        assert!(list.is_empty());
475    }
476
477    #[tokio::test]
478    async fn get_sandbox_returns_none_for_unknown_id() {
479        let mgr = SandboxManager::new();
480        assert!(mgr.get_sandbox("nonexistent-id").await.is_none());
481    }
482
483    #[tokio::test]
484    async fn create_sandbox_rejects_unsupported_lang() {
485        let mgr = SandboxManager::new();
486        let err = mgr
487            .create_sandbox(ResolvedCreateSandboxRequest {
488                lang: "ruby".to_string(),
489                policy: None,
490            })
491            .await
492            .unwrap_err();
493        assert!(err.contains("Only Python"));
494    }
495
496    #[tokio::test]
497    async fn execute_rejects_empty_code() {
498        let mgr = SandboxManager::new();
499        // Inject a fake sandbox so we get past the lookup.
500        {
501            let now = Utc::now();
502            let info = SandboxInfo {
503                id: "fake-id".to_string(),
504                name: "routa-sandbox-fake".to_string(),
505                status: "running".to_string(),
506                lang: "python".to_string(),
507                port: Some(19999),
508                effective_policy: None,
509                created_at: now,
510                last_active_at: now,
511            };
512            mgr.sandboxes
513                .write()
514                .await
515                .insert("fake-id".to_string(), info);
516        }
517        let err = mgr
518            .execute_in_sandbox(
519                "fake-id",
520                ExecuteRequest {
521                    code: "   ".to_string(),
522                },
523            )
524            .await
525            .unwrap_err();
526        assert!(err.contains("empty"));
527    }
528
529    #[tokio::test]
530    async fn delete_sandbox_returns_err_for_unknown() {
531        let mgr = SandboxManager::new();
532        assert!(mgr.delete_sandbox("nonexistent").await.is_err());
533    }
534
535    #[test]
536    fn docker_args_include_policy_mounts_and_workdir() {
537        let policy = ResolvedSandboxPolicy {
538            workspace_id: Some("ws-1".to_string()),
539            codebase_id: Some("cb-1".to_string()),
540            scope_root: "/repo".to_string(),
541            host_workdir: "/repo/src".to_string(),
542            container_workdir: "/workspace/src".to_string(),
543            read_only_paths: vec![],
544            read_write_paths: vec!["/repo/src".to_string()],
545            network_mode: SandboxNetworkMode::None,
546            env_mode: SandboxEnvMode::Sanitized,
547            env_files: vec![],
548            env_allowlist: vec![],
549            mounts: vec![
550                SandboxMount {
551                    host_path: "/repo".to_string(),
552                    container_path: "/workspace".to_string(),
553                    access: SandboxMountAccess::ReadOnly,
554                    reason: Some("scopeRoot".to_string()),
555                },
556                SandboxMount {
557                    host_path: "/repo/src".to_string(),
558                    container_path: "/workspace/src".to_string(),
559                    access: SandboxMountAccess::ReadWrite,
560                    reason: Some("scopeOverride".to_string()),
561                },
562            ],
563            capabilities: vec![],
564            linked_worktrees: vec![],
565            workspace_config: None,
566            notes: vec![],
567        };
568
569        let args = build_docker_run_args("sandbox-name", 12345, "python", Some(&policy))
570            .expect("docker args should build");
571        assert!(args.iter().any(|arg| arg == "--network"));
572        assert!(args.iter().any(|arg| arg == "none"));
573        assert!(args.iter().any(|arg| arg == "/workspace/src"));
574        assert!(args
575            .iter()
576            .any(|arg| arg == "/repo:/workspace:ro" || arg == "/repo/src:/workspace/src:rw"));
577    }
578
579    #[test]
580    fn collect_policy_env_layers_env_files_before_host_env() {
581        let dir = tempfile::tempdir().expect("tempdir should exist");
582        let base_env = dir.path().join(".env.base");
583        let request_env = dir.path().join(".env.request");
584        std::fs::write(&base_env, "SHARED=base\nBASE_ONLY=1\n").expect("base env should exist");
585        std::fs::write(&request_env, "SHARED=request\nREQUEST_ONLY=1\n")
586            .expect("request env should exist");
587        std::env::set_var("SHARED", "host");
588        std::env::set_var("HOST_ONLY", "1");
589
590        let policy = ResolvedSandboxPolicy {
591            workspace_id: None,
592            codebase_id: None,
593            scope_root: dir.path().to_string_lossy().to_string(),
594            host_workdir: dir.path().to_string_lossy().to_string(),
595            container_workdir: "/workspace".to_string(),
596            read_only_paths: vec![],
597            read_write_paths: vec![],
598            network_mode: SandboxNetworkMode::None,
599            env_mode: SandboxEnvMode::Sanitized,
600            env_files: vec![
601                ResolvedSandboxEnvFile {
602                    path: base_env.to_string_lossy().to_string(),
603                    source: SandboxEnvFileSource::WorkspaceConfig,
604                    keys: vec!["BASE_ONLY".to_string(), "SHARED".to_string()],
605                },
606                ResolvedSandboxEnvFile {
607                    path: request_env.to_string_lossy().to_string(),
608                    source: SandboxEnvFileSource::Request,
609                    keys: vec!["REQUEST_ONLY".to_string(), "SHARED".to_string()],
610                },
611            ],
612            env_allowlist: vec!["HOST_ONLY".to_string(), "SHARED".to_string()],
613            mounts: vec![],
614            capabilities: vec![],
615            linked_worktrees: vec![],
616            workspace_config: None,
617            notes: vec![],
618        };
619
620        let env_pairs = collect_policy_env(&policy).expect("env layering should succeed");
621        let env_map = env_pairs.into_iter().collect::<BTreeMap<_, _>>();
622        assert_eq!(env_map.get("BASE_ONLY").map(String::as_str), Some("1"));
623        assert_eq!(env_map.get("REQUEST_ONLY").map(String::as_str), Some("1"));
624        assert_eq!(env_map.get("HOST_ONLY").map(String::as_str), Some("1"));
625        assert_eq!(env_map.get("SHARED").map(String::as_str), Some("host"));
626
627        std::env::remove_var("SHARED");
628        std::env::remove_var("HOST_ONLY");
629    }
630}