Skip to main content

ciab_sandbox/
local.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use async_trait::async_trait;
6use dashmap::DashMap;
7use tokio::process::Command;
8use tokio::sync::mpsc;
9use uuid::Uuid;
10
11use ciab_core::error::{CiabError, CiabResult};
12use ciab_core::traits::runtime::SandboxRuntime;
13use ciab_core::types::sandbox::{
14    ExecRequest, ExecResult, FileInfo, LogOptions, ResourceStats, SandboxInfo, SandboxSpec,
15    SandboxState,
16};
17
18/// Tracks a local sandbox (agent running as a child process).
19struct LocalSandbox {
20    id: Uuid,
21    workdir: PathBuf,
22    state: SandboxState,
23    spec: SandboxSpec,
24    created_at: chrono::DateTime<chrono::Utc>,
25}
26
27/// Runtime that runs agents as local processes — no Docker or containers needed.
28pub struct LocalProcessRuntime {
29    base_workdir: PathBuf,
30    sandboxes: DashMap<Uuid, LocalSandbox>,
31    max_processes: u32,
32    process_count: AtomicU64,
33    active_processes: DashMap<Uuid, tokio::sync::watch::Sender<bool>>,
34}
35
36impl LocalProcessRuntime {
37    pub fn new(base_workdir: Option<String>, max_processes: Option<u32>) -> Self {
38        let base_workdir = base_workdir.map(PathBuf::from).unwrap_or_else(|| {
39            let tmp = std::env::temp_dir().join("ciab-sandboxes");
40            let _ = std::fs::create_dir_all(&tmp);
41            tmp
42        });
43
44        Self {
45            base_workdir,
46            sandboxes: DashMap::new(),
47            max_processes: max_processes.unwrap_or(10),
48            process_count: AtomicU64::new(0),
49            active_processes: DashMap::new(),
50        }
51    }
52
53    fn sandbox_dir(&self, id: &Uuid) -> PathBuf {
54        self.base_workdir.join(id.to_string())
55    }
56
57    fn get_sandbox_ref(
58        &self,
59        id: &Uuid,
60    ) -> CiabResult<dashmap::mapref::one::Ref<'_, Uuid, LocalSandbox>> {
61        self.sandboxes
62            .get(id)
63            .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))
64    }
65
66    fn to_info(sb: &LocalSandbox) -> SandboxInfo {
67        SandboxInfo {
68            id: sb.id,
69            name: sb.spec.name.clone(),
70            state: sb.state.clone(),
71            persistence: sb.spec.persistence.clone(),
72            agent_provider: sb.spec.agent_provider.clone(),
73            endpoint_url: None,
74            resource_stats: None,
75            labels: sb.spec.labels.clone(),
76            created_at: sb.created_at,
77            updated_at: chrono::Utc::now(),
78            spec: sb.spec.clone(),
79        }
80    }
81}
82
83#[async_trait]
84impl SandboxRuntime for LocalProcessRuntime {
85    async fn create_sandbox(&self, spec: &SandboxSpec) -> CiabResult<SandboxInfo> {
86        let count = self.process_count.load(Ordering::Relaxed);
87        if count >= self.max_processes as u64 {
88            return Err(CiabError::SandboxCreationFailed(format!(
89                "max local process limit reached ({})",
90                self.max_processes
91            )));
92        }
93
94        let id = Uuid::new_v4();
95        let workdir = self.sandbox_dir(&id);
96        tokio::fs::create_dir_all(&workdir)
97            .await
98            .map_err(|e| CiabError::SandboxCreationFailed(e.to_string()))?;
99
100        let now = chrono::Utc::now();
101        let sandbox = LocalSandbox {
102            id,
103            workdir,
104            state: SandboxState::Running,
105            spec: spec.clone(),
106            created_at: now,
107        };
108
109        let info = Self::to_info(&sandbox);
110        self.sandboxes.insert(id, sandbox);
111        self.process_count.fetch_add(1, Ordering::Relaxed);
112
113        Ok(info)
114    }
115
116    async fn get_sandbox(&self, id: &Uuid) -> CiabResult<SandboxInfo> {
117        let sb = self.get_sandbox_ref(id)?;
118        Ok(Self::to_info(&sb))
119    }
120
121    async fn list_sandboxes(
122        &self,
123        state: Option<SandboxState>,
124        provider: Option<&str>,
125        labels: &HashMap<String, String>,
126    ) -> CiabResult<Vec<SandboxInfo>> {
127        let mut results: Vec<SandboxInfo> = self
128            .sandboxes
129            .iter()
130            .map(|entry| Self::to_info(entry.value()))
131            .collect();
132
133        if let Some(ref filter_state) = state {
134            results.retain(|s| &s.state == filter_state);
135        }
136        if let Some(filter_provider) = provider {
137            results.retain(|s| s.agent_provider == filter_provider);
138        }
139        if !labels.is_empty() {
140            results.retain(|s| {
141                labels
142                    .iter()
143                    .all(|(k, v)| s.labels.get(k).map(|sv| sv == v).unwrap_or(false))
144            });
145        }
146
147        Ok(results)
148    }
149
150    async fn start_sandbox(&self, id: &Uuid) -> CiabResult<()> {
151        let mut sb = self
152            .sandboxes
153            .get_mut(id)
154            .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
155        sb.state = SandboxState::Running;
156        Ok(())
157    }
158
159    async fn stop_sandbox(&self, id: &Uuid) -> CiabResult<()> {
160        let mut sb = self
161            .sandboxes
162            .get_mut(id)
163            .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
164        sb.state = SandboxState::Stopped;
165        Ok(())
166    }
167
168    async fn pause_sandbox(&self, id: &Uuid) -> CiabResult<()> {
169        let mut sb = self
170            .sandboxes
171            .get_mut(id)
172            .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
173        sb.state = SandboxState::Paused;
174        Ok(())
175    }
176
177    async fn resume_sandbox(&self, id: &Uuid) -> CiabResult<()> {
178        let mut sb = self
179            .sandboxes
180            .get_mut(id)
181            .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
182        sb.state = SandboxState::Running;
183        Ok(())
184    }
185
186    async fn terminate_sandbox(&self, id: &Uuid) -> CiabResult<()> {
187        if let Some((_, _sb)) = self.sandboxes.remove(id) {
188            self.process_count.fetch_sub(1, Ordering::Relaxed);
189            // Clean up the sandbox directory
190            let workdir = self.sandbox_dir(id);
191            if workdir.exists() {
192                let _ = tokio::fs::remove_dir_all(&workdir).await;
193            }
194        }
195        Ok(())
196    }
197
198    async fn exec(&self, id: &Uuid, request: &ExecRequest) -> CiabResult<ExecResult> {
199        let sb = self.get_sandbox_ref(id)?;
200        if sb.state != SandboxState::Running {
201            return Err(CiabError::SandboxInvalidState {
202                current: sb.state.to_string(),
203                expected: "running".to_string(),
204            });
205        }
206
207        let workdir = request
208            .workdir
209            .as_ref()
210            .map(PathBuf::from)
211            .filter(|p| p.exists())
212            .unwrap_or_else(|| sb.workdir.clone());
213
214        if request.command.is_empty() {
215            return Err(CiabError::ExecFailed("empty command".to_string()));
216        }
217
218        let program = &request.command[0];
219        let args = &request.command[1..];
220
221        let start = std::time::Instant::now();
222        let mut cmd = Command::new(program);
223        cmd.args(args)
224            .current_dir(&workdir)
225            .envs(&request.env)
226            .envs(&sb.spec.env_vars)
227            .env_remove("CLAUDECODE");
228
229        if let Some(ref stdin_data) = request.stdin {
230            let mut child = cmd
231                .stdin(std::process::Stdio::piped())
232                .stdout(std::process::Stdio::piped())
233                .stderr(std::process::Stdio::piped())
234                .spawn()
235                .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
236
237            if let Some(ref mut child_stdin) = child.stdin {
238                use tokio::io::AsyncWriteExt;
239                let _ = child_stdin.write_all(stdin_data.as_bytes()).await;
240                let _ = child_stdin.shutdown().await;
241            }
242            // Drop stdin to signal EOF
243            child.stdin.take();
244
245            let output = child
246                .wait_with_output()
247                .await
248                .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
249
250            let duration = start.elapsed();
251            return Ok(ExecResult {
252                exit_code: output.status.code().unwrap_or(-1),
253                stdout: String::from_utf8_lossy(&output.stdout).to_string(),
254                stderr: String::from_utf8_lossy(&output.stderr).to_string(),
255                duration_ms: duration.as_millis() as u64,
256            });
257        }
258
259        cmd.stdout(std::process::Stdio::piped())
260            .stderr(std::process::Stdio::piped());
261
262        let output = if let Some(timeout_secs) = request.timeout_secs {
263            tokio::time::timeout(
264                std::time::Duration::from_secs(timeout_secs as u64),
265                cmd.output(),
266            )
267            .await
268            .map_err(|_| CiabError::Timeout("exec command timed out".to_string()))?
269            .map_err(|e| CiabError::ExecFailed(e.to_string()))?
270        } else {
271            cmd.output()
272                .await
273                .map_err(|e| CiabError::ExecFailed(e.to_string()))?
274        };
275
276        let duration = start.elapsed();
277        Ok(ExecResult {
278            exit_code: output.status.code().unwrap_or(-1),
279            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
280            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
281            duration_ms: duration.as_millis() as u64,
282        })
283    }
284
285    async fn exec_streaming(
286        &self,
287        id: &Uuid,
288        request: &ExecRequest,
289    ) -> CiabResult<(
290        mpsc::Receiver<String>,
291        tokio::task::JoinHandle<CiabResult<ExecResult>>,
292    )> {
293        let sb = self.get_sandbox_ref(id)?;
294        if sb.state != SandboxState::Running {
295            return Err(CiabError::SandboxInvalidState {
296                current: sb.state.to_string(),
297                expected: "running".to_string(),
298            });
299        }
300
301        let workdir = request
302            .workdir
303            .as_ref()
304            .map(PathBuf::from)
305            .filter(|p| p.exists())
306            .unwrap_or_else(|| sb.workdir.clone());
307
308        if request.command.is_empty() {
309            return Err(CiabError::ExecFailed("empty command".to_string()));
310        }
311
312        let program = request.command[0].clone();
313        let args: Vec<String> = request.command[1..].to_vec();
314        let env_vars: HashMap<String, String> = request.env.clone();
315        let sandbox_env: HashMap<String, String> = sb.spec.env_vars.clone();
316        let timeout_secs = request.timeout_secs;
317
318        let (tx, rx) = mpsc::channel::<String>(256);
319
320        let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
321        self.active_processes.insert(*id, cancel_tx);
322
323        let handle = tokio::spawn(async move {
324            use tokio::io::{AsyncBufReadExt, BufReader};
325
326            let start = std::time::Instant::now();
327            let mut cmd = Command::new(&program);
328            cmd.args(&args)
329                .current_dir(&workdir)
330                .envs(&env_vars)
331                .envs(&sandbox_env)
332                // Remove CLAUDECODE env var so agent CLIs (e.g. Claude Code)
333                // don't think they're running in a nested session.
334                .env_remove("CLAUDECODE")
335                .stdout(std::process::Stdio::piped())
336                .stderr(std::process::Stdio::piped());
337
338            let mut child = cmd
339                .spawn()
340                .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
341
342            let stdout = child.stdout.take();
343            let stderr = child.stderr.take();
344
345            let tx_out = tx.clone();
346            let stdout_handle = tokio::spawn(async move {
347                let mut all = String::new();
348                if let Some(stdout) = stdout {
349                    let mut reader = BufReader::new(stdout);
350                    let mut line = String::new();
351                    loop {
352                        line.clear();
353                        match reader.read_line(&mut line).await {
354                            Ok(0) => break,
355                            Ok(_) => {
356                                let trimmed = line.trim_end_matches('\n').to_string();
357                                all.push_str(&trimmed);
358                                all.push('\n');
359                                let _ = tx_out.send(trimmed).await;
360                            }
361                            Err(_) => break,
362                        }
363                    }
364                }
365                all
366            });
367
368            let stderr_handle = tokio::spawn(async move {
369                let mut all = String::new();
370                if let Some(stderr) = stderr {
371                    let mut reader = BufReader::new(stderr);
372                    let mut line = String::new();
373                    loop {
374                        line.clear();
375                        match reader.read_line(&mut line).await {
376                            Ok(0) => break,
377                            Ok(_) => {
378                                all.push_str(&line);
379                            }
380                            Err(_) => break,
381                        }
382                    }
383                }
384                all
385            });
386
387            let wait_result = tokio::select! {
388                result = async {
389                    if let Some(secs) = timeout_secs {
390                        tokio::time::timeout(
391                            std::time::Duration::from_secs(secs as u64),
392                            child.wait(),
393                        )
394                        .await
395                        .map_err(|_| CiabError::Timeout("exec command timed out".to_string()))?
396                        .map_err(|e| CiabError::ExecFailed(e.to_string()))
397                    } else {
398                        child.wait().await.map_err(|e| CiabError::ExecFailed(e.to_string()))
399                    }
400                } => result?,
401                _ = async {
402                    loop {
403                        if cancel_rx.changed().await.is_err() {
404                            // Sender dropped — no cancellation.
405                            futures::future::pending::<()>().await;
406                        }
407                        if *cancel_rx.borrow() {
408                            break;
409                        }
410                    }
411                } => {
412                    let _ = child.kill().await;
413                    return Err(CiabError::ExecFailed("process cancelled".to_string()));
414                }
415            };
416
417            let stdout_text = stdout_handle.await.unwrap_or_default();
418            let stderr_text = stderr_handle.await.unwrap_or_default();
419            let duration = start.elapsed();
420
421            Ok(ExecResult {
422                exit_code: wait_result.code().unwrap_or(-1),
423                stdout: stdout_text,
424                stderr: stderr_text,
425                duration_ms: duration.as_millis() as u64,
426            })
427        });
428
429        Ok((rx, handle))
430    }
431
432    async fn exec_streaming_interactive(
433        &self,
434        id: &Uuid,
435        request: &ExecRequest,
436    ) -> CiabResult<(
437        mpsc::Receiver<String>,
438        mpsc::Sender<String>,
439        tokio::task::JoinHandle<CiabResult<ExecResult>>,
440    )> {
441        let sb = self.get_sandbox_ref(id)?;
442        if sb.state != SandboxState::Running {
443            return Err(CiabError::SandboxInvalidState {
444                current: sb.state.to_string(),
445                expected: "running".to_string(),
446            });
447        }
448
449        let workdir = request
450            .workdir
451            .as_ref()
452            .map(PathBuf::from)
453            .filter(|p| p.exists())
454            .unwrap_or_else(|| sb.workdir.clone());
455
456        if request.command.is_empty() {
457            return Err(CiabError::ExecFailed("empty command".to_string()));
458        }
459
460        let program = request.command[0].clone();
461        let args: Vec<String> = request.command[1..].to_vec();
462        let env_vars: HashMap<String, String> = request.env.clone();
463        let sandbox_env: HashMap<String, String> = sb.spec.env_vars.clone();
464        let timeout_secs = request.timeout_secs;
465
466        let (stdout_tx, stdout_rx) = mpsc::channel::<String>(256);
467        let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(64);
468
469        let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
470        self.active_processes.insert(*id, cancel_tx);
471
472        let handle = tokio::spawn(async move {
473            use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
474
475            let start = std::time::Instant::now();
476            let mut cmd = Command::new(&program);
477            cmd.args(&args)
478                .current_dir(&workdir)
479                .envs(&env_vars)
480                .envs(&sandbox_env)
481                // Remove CLAUDECODE env var so agent CLIs (e.g. Claude Code)
482                // don't think they're running in a nested session.
483                .env_remove("CLAUDECODE")
484                .stdin(std::process::Stdio::piped())
485                .stdout(std::process::Stdio::piped())
486                .stderr(std::process::Stdio::piped());
487
488            let mut child = cmd
489                .spawn()
490                .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
491
492            let child_stdin = child.stdin.take();
493            let stdout = child.stdout.take();
494            let stderr = child.stderr.take();
495
496            // Stdin writer task: reads from stdin_rx and writes to child's stdin.
497            let stdin_handle = tokio::spawn(async move {
498                if let Some(mut stdin) = child_stdin {
499                    while let Some(line) = stdin_rx.recv().await {
500                        let data = format!("{}\n", line);
501                        if stdin.write_all(data.as_bytes()).await.is_err() {
502                            break;
503                        }
504                        if stdin.flush().await.is_err() {
505                            break;
506                        }
507                    }
508                }
509            });
510
511            // Stdout reader task
512            let tx_out = stdout_tx.clone();
513            let stdout_handle = tokio::spawn(async move {
514                let mut all = String::new();
515                if let Some(stdout) = stdout {
516                    let mut reader = BufReader::new(stdout);
517                    let mut line = String::new();
518                    loop {
519                        line.clear();
520                        match reader.read_line(&mut line).await {
521                            Ok(0) => break,
522                            Ok(_) => {
523                                let trimmed = line.trim_end_matches('\n').to_string();
524                                all.push_str(&trimmed);
525                                all.push('\n');
526                                let _ = tx_out.send(trimmed).await;
527                            }
528                            Err(_) => break,
529                        }
530                    }
531                }
532                all
533            });
534
535            // Stderr reader task
536            let stderr_handle = tokio::spawn(async move {
537                let mut all = String::new();
538                if let Some(stderr) = stderr {
539                    let mut reader = BufReader::new(stderr);
540                    let mut line = String::new();
541                    loop {
542                        line.clear();
543                        match reader.read_line(&mut line).await {
544                            Ok(0) => break,
545                            Ok(_) => {
546                                all.push_str(&line);
547                            }
548                            Err(_) => break,
549                        }
550                    }
551                }
552                all
553            });
554
555            let wait_result = tokio::select! {
556                result = async {
557                    if let Some(secs) = timeout_secs {
558                        tokio::time::timeout(
559                            std::time::Duration::from_secs(secs as u64),
560                            child.wait(),
561                        )
562                        .await
563                        .map_err(|_| CiabError::Timeout("exec command timed out".to_string()))?
564                        .map_err(|e| CiabError::ExecFailed(e.to_string()))
565                    } else {
566                        child.wait().await.map_err(|e| CiabError::ExecFailed(e.to_string()))
567                    }
568                } => result?,
569                _ = async {
570                    loop {
571                        if cancel_rx.changed().await.is_err() {
572                            futures::future::pending::<()>().await;
573                        }
574                        if *cancel_rx.borrow() {
575                            break;
576                        }
577                    }
578                } => {
579                    let _ = child.kill().await;
580                    stdin_handle.abort();
581                    return Err(CiabError::ExecFailed("process cancelled".to_string()));
582                }
583            };
584
585            stdin_handle.abort(); // Clean up stdin writer
586            let stdout_text = stdout_handle.await.unwrap_or_default();
587            let stderr_text = stderr_handle.await.unwrap_or_default();
588            let duration = start.elapsed();
589
590            Ok(ExecResult {
591                exit_code: wait_result.code().unwrap_or(-1),
592                stdout: stdout_text,
593                stderr: stderr_text,
594                duration_ms: duration.as_millis() as u64,
595            })
596        });
597
598        Ok((stdout_rx, stdin_tx, handle))
599    }
600
601    async fn read_file(&self, id: &Uuid, path: &str) -> CiabResult<Vec<u8>> {
602        let sb = self.get_sandbox_ref(id)?;
603        let file_path = resolve_path(&sb.workdir, path);
604        tokio::fs::read(&file_path)
605            .await
606            .map_err(|e| CiabError::FileNotFound(format!("{}: {}", path, e)))
607    }
608
609    async fn write_file(&self, id: &Uuid, path: &str, content: &[u8]) -> CiabResult<()> {
610        let sb = self.get_sandbox_ref(id)?;
611        let file_path = resolve_path(&sb.workdir, path);
612        if let Some(parent) = file_path.parent() {
613            tokio::fs::create_dir_all(parent)
614                .await
615                .map_err(|e| CiabError::Internal(e.to_string()))?;
616        }
617        tokio::fs::write(&file_path, content)
618            .await
619            .map_err(|e| CiabError::Internal(format!("write file {}: {}", path, e)))
620    }
621
622    async fn list_files(&self, id: &Uuid, path: &str) -> CiabResult<Vec<FileInfo>> {
623        let sb = self.get_sandbox_ref(id)?;
624        let dir_path = resolve_path(&sb.workdir, path);
625
626        let mut entries = tokio::fs::read_dir(&dir_path)
627            .await
628            .map_err(|e| CiabError::FileNotFound(format!("{}: {}", path, e)))?;
629
630        let mut files = Vec::new();
631        while let Some(entry) = entries
632            .next_entry()
633            .await
634            .map_err(|e| CiabError::Internal(e.to_string()))?
635        {
636            let metadata = entry
637                .metadata()
638                .await
639                .map_err(|e| CiabError::Internal(e.to_string()))?;
640
641            let modified_at = metadata
642                .modified()
643                .ok()
644                .and_then(|t| {
645                    t.duration_since(std::time::UNIX_EPOCH)
646                        .ok()
647                        .map(|d| chrono::DateTime::from_timestamp(d.as_secs() as i64, 0))
648                })
649                .flatten();
650
651            files.push(FileInfo {
652                path: entry
653                    .path()
654                    .strip_prefix(&sb.workdir)
655                    .unwrap_or(entry.path().as_path())
656                    .to_string_lossy()
657                    .to_string(),
658                size: metadata.len(),
659                is_dir: metadata.is_dir(),
660                mode: 0o644,
661                modified_at,
662            });
663        }
664
665        Ok(files)
666    }
667
668    async fn get_stats(&self, id: &Uuid) -> CiabResult<ResourceStats> {
669        let _sb = self.get_sandbox_ref(id)?;
670        // For local processes, return approximate stats
671        Ok(ResourceStats {
672            cpu_usage_percent: 0.0,
673            memory_used_mb: 0,
674            memory_limit_mb: 0,
675            disk_used_mb: 0,
676            disk_limit_mb: 0,
677            network_rx_bytes: 0,
678            network_tx_bytes: 0,
679        })
680    }
681
682    async fn stream_logs(
683        &self,
684        _id: &Uuid,
685        _options: &LogOptions,
686    ) -> CiabResult<mpsc::Receiver<String>> {
687        let (tx, rx) = mpsc::channel(256);
688        // Local processes don't have a unified log stream;
689        // send a placeholder message
690        tokio::spawn(async move {
691            let _ = tx.send("[local runtime] Log streaming not available for local processes. Use exec to run log commands.".to_string()).await;
692        });
693        Ok(rx)
694    }
695
696    async fn kill_exec(&self, id: &Uuid) -> CiabResult<()> {
697        if let Some((_, tx)) = self.active_processes.remove(id) {
698            let _ = tx.send(true);
699        }
700        Ok(())
701    }
702}
703
704/// Resolve a path relative to the sandbox workdir, preventing path traversal.
705fn resolve_path(workdir: &Path, path: &str) -> PathBuf {
706    let clean = path.trim_start_matches('/');
707    let resolved = workdir.join(clean);
708    // Basic path traversal prevention
709    if resolved.starts_with(workdir) {
710        resolved
711    } else {
712        workdir.join(clean.replace("..", "_"))
713    }
714}