1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use async_trait::async_trait;
6use dashmap::DashMap;
7use tokio::process::Command;
8use tokio::sync::mpsc;
9use uuid::Uuid;
10
11use ciab_core::error::{CiabError, CiabResult};
12use ciab_core::traits::runtime::SandboxRuntime;
13use ciab_core::types::sandbox::{
14 ExecRequest, ExecResult, FileInfo, LogOptions, ResourceStats, SandboxInfo, SandboxSpec,
15 SandboxState,
16};
17
18struct LocalSandbox {
20 id: Uuid,
21 workdir: PathBuf,
22 state: SandboxState,
23 spec: SandboxSpec,
24 created_at: chrono::DateTime<chrono::Utc>,
25}
26
27pub struct LocalProcessRuntime {
29 base_workdir: PathBuf,
30 sandboxes: DashMap<Uuid, LocalSandbox>,
31 max_processes: u32,
32 process_count: AtomicU64,
33 active_processes: DashMap<Uuid, tokio::sync::watch::Sender<bool>>,
34}
35
36impl LocalProcessRuntime {
37 pub fn new(base_workdir: Option<String>, max_processes: Option<u32>) -> Self {
38 let base_workdir = base_workdir.map(PathBuf::from).unwrap_or_else(|| {
39 let tmp = std::env::temp_dir().join("ciab-sandboxes");
40 let _ = std::fs::create_dir_all(&tmp);
41 tmp
42 });
43
44 Self {
45 base_workdir,
46 sandboxes: DashMap::new(),
47 max_processes: max_processes.unwrap_or(10),
48 process_count: AtomicU64::new(0),
49 active_processes: DashMap::new(),
50 }
51 }
52
53 fn sandbox_dir(&self, id: &Uuid) -> PathBuf {
54 self.base_workdir.join(id.to_string())
55 }
56
57 fn get_sandbox_ref(
58 &self,
59 id: &Uuid,
60 ) -> CiabResult<dashmap::mapref::one::Ref<'_, Uuid, LocalSandbox>> {
61 self.sandboxes
62 .get(id)
63 .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))
64 }
65
66 fn to_info(sb: &LocalSandbox) -> SandboxInfo {
67 SandboxInfo {
68 id: sb.id,
69 name: sb.spec.name.clone(),
70 state: sb.state.clone(),
71 persistence: sb.spec.persistence.clone(),
72 agent_provider: sb.spec.agent_provider.clone(),
73 endpoint_url: None,
74 resource_stats: None,
75 labels: sb.spec.labels.clone(),
76 created_at: sb.created_at,
77 updated_at: chrono::Utc::now(),
78 spec: sb.spec.clone(),
79 }
80 }
81}
82
83#[async_trait]
84impl SandboxRuntime for LocalProcessRuntime {
85 async fn create_sandbox(&self, spec: &SandboxSpec) -> CiabResult<SandboxInfo> {
86 let count = self.process_count.load(Ordering::Relaxed);
87 if count >= self.max_processes as u64 {
88 return Err(CiabError::SandboxCreationFailed(format!(
89 "max local process limit reached ({})",
90 self.max_processes
91 )));
92 }
93
94 let id = Uuid::new_v4();
95 let workdir = self.sandbox_dir(&id);
96 tokio::fs::create_dir_all(&workdir)
97 .await
98 .map_err(|e| CiabError::SandboxCreationFailed(e.to_string()))?;
99
100 let now = chrono::Utc::now();
101 let sandbox = LocalSandbox {
102 id,
103 workdir,
104 state: SandboxState::Running,
105 spec: spec.clone(),
106 created_at: now,
107 };
108
109 let info = Self::to_info(&sandbox);
110 self.sandboxes.insert(id, sandbox);
111 self.process_count.fetch_add(1, Ordering::Relaxed);
112
113 Ok(info)
114 }
115
116 async fn get_sandbox(&self, id: &Uuid) -> CiabResult<SandboxInfo> {
117 let sb = self.get_sandbox_ref(id)?;
118 Ok(Self::to_info(&sb))
119 }
120
121 async fn list_sandboxes(
122 &self,
123 state: Option<SandboxState>,
124 provider: Option<&str>,
125 labels: &HashMap<String, String>,
126 ) -> CiabResult<Vec<SandboxInfo>> {
127 let mut results: Vec<SandboxInfo> = self
128 .sandboxes
129 .iter()
130 .map(|entry| Self::to_info(entry.value()))
131 .collect();
132
133 if let Some(ref filter_state) = state {
134 results.retain(|s| &s.state == filter_state);
135 }
136 if let Some(filter_provider) = provider {
137 results.retain(|s| s.agent_provider == filter_provider);
138 }
139 if !labels.is_empty() {
140 results.retain(|s| {
141 labels
142 .iter()
143 .all(|(k, v)| s.labels.get(k).map(|sv| sv == v).unwrap_or(false))
144 });
145 }
146
147 Ok(results)
148 }
149
150 async fn start_sandbox(&self, id: &Uuid) -> CiabResult<()> {
151 let mut sb = self
152 .sandboxes
153 .get_mut(id)
154 .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
155 sb.state = SandboxState::Running;
156 Ok(())
157 }
158
159 async fn stop_sandbox(&self, id: &Uuid) -> CiabResult<()> {
160 let mut sb = self
161 .sandboxes
162 .get_mut(id)
163 .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
164 sb.state = SandboxState::Stopped;
165 Ok(())
166 }
167
168 async fn pause_sandbox(&self, id: &Uuid) -> CiabResult<()> {
169 let mut sb = self
170 .sandboxes
171 .get_mut(id)
172 .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
173 sb.state = SandboxState::Paused;
174 Ok(())
175 }
176
177 async fn resume_sandbox(&self, id: &Uuid) -> CiabResult<()> {
178 let mut sb = self
179 .sandboxes
180 .get_mut(id)
181 .ok_or_else(|| CiabError::SandboxNotFound(id.to_string()))?;
182 sb.state = SandboxState::Running;
183 Ok(())
184 }
185
186 async fn terminate_sandbox(&self, id: &Uuid) -> CiabResult<()> {
187 if let Some((_, _sb)) = self.sandboxes.remove(id) {
188 self.process_count.fetch_sub(1, Ordering::Relaxed);
189 let workdir = self.sandbox_dir(id);
191 if workdir.exists() {
192 let _ = tokio::fs::remove_dir_all(&workdir).await;
193 }
194 }
195 Ok(())
196 }
197
198 async fn exec(&self, id: &Uuid, request: &ExecRequest) -> CiabResult<ExecResult> {
199 let sb = self.get_sandbox_ref(id)?;
200 if sb.state != SandboxState::Running {
201 return Err(CiabError::SandboxInvalidState {
202 current: sb.state.to_string(),
203 expected: "running".to_string(),
204 });
205 }
206
207 let workdir = request
208 .workdir
209 .as_ref()
210 .map(PathBuf::from)
211 .filter(|p| p.exists())
212 .unwrap_or_else(|| sb.workdir.clone());
213
214 if request.command.is_empty() {
215 return Err(CiabError::ExecFailed("empty command".to_string()));
216 }
217
218 let program = &request.command[0];
219 let args = &request.command[1..];
220
221 let start = std::time::Instant::now();
222 let mut cmd = Command::new(program);
223 cmd.args(args)
224 .current_dir(&workdir)
225 .envs(&request.env)
226 .envs(&sb.spec.env_vars)
227 .env_remove("CLAUDECODE");
228
229 if let Some(ref stdin_data) = request.stdin {
230 let mut child = cmd
231 .stdin(std::process::Stdio::piped())
232 .stdout(std::process::Stdio::piped())
233 .stderr(std::process::Stdio::piped())
234 .spawn()
235 .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
236
237 if let Some(ref mut child_stdin) = child.stdin {
238 use tokio::io::AsyncWriteExt;
239 let _ = child_stdin.write_all(stdin_data.as_bytes()).await;
240 let _ = child_stdin.shutdown().await;
241 }
242 child.stdin.take();
244
245 let output = child
246 .wait_with_output()
247 .await
248 .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
249
250 let duration = start.elapsed();
251 return Ok(ExecResult {
252 exit_code: output.status.code().unwrap_or(-1),
253 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
254 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
255 duration_ms: duration.as_millis() as u64,
256 });
257 }
258
259 cmd.stdout(std::process::Stdio::piped())
260 .stderr(std::process::Stdio::piped());
261
262 let output = if let Some(timeout_secs) = request.timeout_secs {
263 tokio::time::timeout(
264 std::time::Duration::from_secs(timeout_secs as u64),
265 cmd.output(),
266 )
267 .await
268 .map_err(|_| CiabError::Timeout("exec command timed out".to_string()))?
269 .map_err(|e| CiabError::ExecFailed(e.to_string()))?
270 } else {
271 cmd.output()
272 .await
273 .map_err(|e| CiabError::ExecFailed(e.to_string()))?
274 };
275
276 let duration = start.elapsed();
277 Ok(ExecResult {
278 exit_code: output.status.code().unwrap_or(-1),
279 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
280 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
281 duration_ms: duration.as_millis() as u64,
282 })
283 }
284
285 async fn exec_streaming(
286 &self,
287 id: &Uuid,
288 request: &ExecRequest,
289 ) -> CiabResult<(
290 mpsc::Receiver<String>,
291 tokio::task::JoinHandle<CiabResult<ExecResult>>,
292 )> {
293 let sb = self.get_sandbox_ref(id)?;
294 if sb.state != SandboxState::Running {
295 return Err(CiabError::SandboxInvalidState {
296 current: sb.state.to_string(),
297 expected: "running".to_string(),
298 });
299 }
300
301 let workdir = request
302 .workdir
303 .as_ref()
304 .map(PathBuf::from)
305 .filter(|p| p.exists())
306 .unwrap_or_else(|| sb.workdir.clone());
307
308 if request.command.is_empty() {
309 return Err(CiabError::ExecFailed("empty command".to_string()));
310 }
311
312 let program = request.command[0].clone();
313 let args: Vec<String> = request.command[1..].to_vec();
314 let env_vars: HashMap<String, String> = request.env.clone();
315 let sandbox_env: HashMap<String, String> = sb.spec.env_vars.clone();
316 let timeout_secs = request.timeout_secs;
317
318 let (tx, rx) = mpsc::channel::<String>(256);
319
320 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
321 self.active_processes.insert(*id, cancel_tx);
322
323 let handle = tokio::spawn(async move {
324 use tokio::io::{AsyncBufReadExt, BufReader};
325
326 let start = std::time::Instant::now();
327 let mut cmd = Command::new(&program);
328 cmd.args(&args)
329 .current_dir(&workdir)
330 .envs(&env_vars)
331 .envs(&sandbox_env)
332 .env_remove("CLAUDECODE")
335 .stdout(std::process::Stdio::piped())
336 .stderr(std::process::Stdio::piped());
337
338 let mut child = cmd
339 .spawn()
340 .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
341
342 let stdout = child.stdout.take();
343 let stderr = child.stderr.take();
344
345 let tx_out = tx.clone();
346 let stdout_handle = tokio::spawn(async move {
347 let mut all = String::new();
348 if let Some(stdout) = stdout {
349 let mut reader = BufReader::new(stdout);
350 let mut line = String::new();
351 loop {
352 line.clear();
353 match reader.read_line(&mut line).await {
354 Ok(0) => break,
355 Ok(_) => {
356 let trimmed = line.trim_end_matches('\n').to_string();
357 all.push_str(&trimmed);
358 all.push('\n');
359 let _ = tx_out.send(trimmed).await;
360 }
361 Err(_) => break,
362 }
363 }
364 }
365 all
366 });
367
368 let stderr_handle = tokio::spawn(async move {
369 let mut all = String::new();
370 if let Some(stderr) = stderr {
371 let mut reader = BufReader::new(stderr);
372 let mut line = String::new();
373 loop {
374 line.clear();
375 match reader.read_line(&mut line).await {
376 Ok(0) => break,
377 Ok(_) => {
378 all.push_str(&line);
379 }
380 Err(_) => break,
381 }
382 }
383 }
384 all
385 });
386
387 let wait_result = tokio::select! {
388 result = async {
389 if let Some(secs) = timeout_secs {
390 tokio::time::timeout(
391 std::time::Duration::from_secs(secs as u64),
392 child.wait(),
393 )
394 .await
395 .map_err(|_| CiabError::Timeout("exec command timed out".to_string()))?
396 .map_err(|e| CiabError::ExecFailed(e.to_string()))
397 } else {
398 child.wait().await.map_err(|e| CiabError::ExecFailed(e.to_string()))
399 }
400 } => result?,
401 _ = async {
402 loop {
403 if cancel_rx.changed().await.is_err() {
404 futures::future::pending::<()>().await;
406 }
407 if *cancel_rx.borrow() {
408 break;
409 }
410 }
411 } => {
412 let _ = child.kill().await;
413 return Err(CiabError::ExecFailed("process cancelled".to_string()));
414 }
415 };
416
417 let stdout_text = stdout_handle.await.unwrap_or_default();
418 let stderr_text = stderr_handle.await.unwrap_or_default();
419 let duration = start.elapsed();
420
421 Ok(ExecResult {
422 exit_code: wait_result.code().unwrap_or(-1),
423 stdout: stdout_text,
424 stderr: stderr_text,
425 duration_ms: duration.as_millis() as u64,
426 })
427 });
428
429 Ok((rx, handle))
430 }
431
432 async fn exec_streaming_interactive(
433 &self,
434 id: &Uuid,
435 request: &ExecRequest,
436 ) -> CiabResult<(
437 mpsc::Receiver<String>,
438 mpsc::Sender<String>,
439 tokio::task::JoinHandle<CiabResult<ExecResult>>,
440 )> {
441 let sb = self.get_sandbox_ref(id)?;
442 if sb.state != SandboxState::Running {
443 return Err(CiabError::SandboxInvalidState {
444 current: sb.state.to_string(),
445 expected: "running".to_string(),
446 });
447 }
448
449 let workdir = request
450 .workdir
451 .as_ref()
452 .map(PathBuf::from)
453 .filter(|p| p.exists())
454 .unwrap_or_else(|| sb.workdir.clone());
455
456 if request.command.is_empty() {
457 return Err(CiabError::ExecFailed("empty command".to_string()));
458 }
459
460 let program = request.command[0].clone();
461 let args: Vec<String> = request.command[1..].to_vec();
462 let env_vars: HashMap<String, String> = request.env.clone();
463 let sandbox_env: HashMap<String, String> = sb.spec.env_vars.clone();
464 let timeout_secs = request.timeout_secs;
465
466 let (stdout_tx, stdout_rx) = mpsc::channel::<String>(256);
467 let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(64);
468
469 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
470 self.active_processes.insert(*id, cancel_tx);
471
472 let handle = tokio::spawn(async move {
473 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
474
475 let start = std::time::Instant::now();
476 let mut cmd = Command::new(&program);
477 cmd.args(&args)
478 .current_dir(&workdir)
479 .envs(&env_vars)
480 .envs(&sandbox_env)
481 .env_remove("CLAUDECODE")
484 .stdin(std::process::Stdio::piped())
485 .stdout(std::process::Stdio::piped())
486 .stderr(std::process::Stdio::piped());
487
488 let mut child = cmd
489 .spawn()
490 .map_err(|e| CiabError::ExecFailed(e.to_string()))?;
491
492 let child_stdin = child.stdin.take();
493 let stdout = child.stdout.take();
494 let stderr = child.stderr.take();
495
496 let stdin_handle = tokio::spawn(async move {
498 if let Some(mut stdin) = child_stdin {
499 while let Some(line) = stdin_rx.recv().await {
500 let data = format!("{}\n", line);
501 if stdin.write_all(data.as_bytes()).await.is_err() {
502 break;
503 }
504 if stdin.flush().await.is_err() {
505 break;
506 }
507 }
508 }
509 });
510
511 let tx_out = stdout_tx.clone();
513 let stdout_handle = tokio::spawn(async move {
514 let mut all = String::new();
515 if let Some(stdout) = stdout {
516 let mut reader = BufReader::new(stdout);
517 let mut line = String::new();
518 loop {
519 line.clear();
520 match reader.read_line(&mut line).await {
521 Ok(0) => break,
522 Ok(_) => {
523 let trimmed = line.trim_end_matches('\n').to_string();
524 all.push_str(&trimmed);
525 all.push('\n');
526 let _ = tx_out.send(trimmed).await;
527 }
528 Err(_) => break,
529 }
530 }
531 }
532 all
533 });
534
535 let stderr_handle = tokio::spawn(async move {
537 let mut all = String::new();
538 if let Some(stderr) = stderr {
539 let mut reader = BufReader::new(stderr);
540 let mut line = String::new();
541 loop {
542 line.clear();
543 match reader.read_line(&mut line).await {
544 Ok(0) => break,
545 Ok(_) => {
546 all.push_str(&line);
547 }
548 Err(_) => break,
549 }
550 }
551 }
552 all
553 });
554
555 let wait_result = tokio::select! {
556 result = async {
557 if let Some(secs) = timeout_secs {
558 tokio::time::timeout(
559 std::time::Duration::from_secs(secs as u64),
560 child.wait(),
561 )
562 .await
563 .map_err(|_| CiabError::Timeout("exec command timed out".to_string()))?
564 .map_err(|e| CiabError::ExecFailed(e.to_string()))
565 } else {
566 child.wait().await.map_err(|e| CiabError::ExecFailed(e.to_string()))
567 }
568 } => result?,
569 _ = async {
570 loop {
571 if cancel_rx.changed().await.is_err() {
572 futures::future::pending::<()>().await;
573 }
574 if *cancel_rx.borrow() {
575 break;
576 }
577 }
578 } => {
579 let _ = child.kill().await;
580 stdin_handle.abort();
581 return Err(CiabError::ExecFailed("process cancelled".to_string()));
582 }
583 };
584
585 stdin_handle.abort(); let stdout_text = stdout_handle.await.unwrap_or_default();
587 let stderr_text = stderr_handle.await.unwrap_or_default();
588 let duration = start.elapsed();
589
590 Ok(ExecResult {
591 exit_code: wait_result.code().unwrap_or(-1),
592 stdout: stdout_text,
593 stderr: stderr_text,
594 duration_ms: duration.as_millis() as u64,
595 })
596 });
597
598 Ok((stdout_rx, stdin_tx, handle))
599 }
600
601 async fn read_file(&self, id: &Uuid, path: &str) -> CiabResult<Vec<u8>> {
602 let sb = self.get_sandbox_ref(id)?;
603 let file_path = resolve_path(&sb.workdir, path);
604 tokio::fs::read(&file_path)
605 .await
606 .map_err(|e| CiabError::FileNotFound(format!("{}: {}", path, e)))
607 }
608
609 async fn write_file(&self, id: &Uuid, path: &str, content: &[u8]) -> CiabResult<()> {
610 let sb = self.get_sandbox_ref(id)?;
611 let file_path = resolve_path(&sb.workdir, path);
612 if let Some(parent) = file_path.parent() {
613 tokio::fs::create_dir_all(parent)
614 .await
615 .map_err(|e| CiabError::Internal(e.to_string()))?;
616 }
617 tokio::fs::write(&file_path, content)
618 .await
619 .map_err(|e| CiabError::Internal(format!("write file {}: {}", path, e)))
620 }
621
622 async fn list_files(&self, id: &Uuid, path: &str) -> CiabResult<Vec<FileInfo>> {
623 let sb = self.get_sandbox_ref(id)?;
624 let dir_path = resolve_path(&sb.workdir, path);
625
626 let mut entries = tokio::fs::read_dir(&dir_path)
627 .await
628 .map_err(|e| CiabError::FileNotFound(format!("{}: {}", path, e)))?;
629
630 let mut files = Vec::new();
631 while let Some(entry) = entries
632 .next_entry()
633 .await
634 .map_err(|e| CiabError::Internal(e.to_string()))?
635 {
636 let metadata = entry
637 .metadata()
638 .await
639 .map_err(|e| CiabError::Internal(e.to_string()))?;
640
641 let modified_at = metadata
642 .modified()
643 .ok()
644 .and_then(|t| {
645 t.duration_since(std::time::UNIX_EPOCH)
646 .ok()
647 .map(|d| chrono::DateTime::from_timestamp(d.as_secs() as i64, 0))
648 })
649 .flatten();
650
651 files.push(FileInfo {
652 path: entry
653 .path()
654 .strip_prefix(&sb.workdir)
655 .unwrap_or(entry.path().as_path())
656 .to_string_lossy()
657 .to_string(),
658 size: metadata.len(),
659 is_dir: metadata.is_dir(),
660 mode: 0o644,
661 modified_at,
662 });
663 }
664
665 Ok(files)
666 }
667
668 async fn get_stats(&self, id: &Uuid) -> CiabResult<ResourceStats> {
669 let _sb = self.get_sandbox_ref(id)?;
670 Ok(ResourceStats {
672 cpu_usage_percent: 0.0,
673 memory_used_mb: 0,
674 memory_limit_mb: 0,
675 disk_used_mb: 0,
676 disk_limit_mb: 0,
677 network_rx_bytes: 0,
678 network_tx_bytes: 0,
679 })
680 }
681
682 async fn stream_logs(
683 &self,
684 _id: &Uuid,
685 _options: &LogOptions,
686 ) -> CiabResult<mpsc::Receiver<String>> {
687 let (tx, rx) = mpsc::channel(256);
688 tokio::spawn(async move {
691 let _ = tx.send("[local runtime] Log streaming not available for local processes. Use exec to run log commands.".to_string()).await;
692 });
693 Ok(rx)
694 }
695
696 async fn kill_exec(&self, id: &Uuid) -> CiabResult<()> {
697 if let Some((_, tx)) = self.active_processes.remove(id) {
698 let _ = tx.send(true);
699 }
700 Ok(())
701 }
702}
703
704fn resolve_path(workdir: &Path, path: &str) -> PathBuf {
706 let clean = path.trim_start_matches('/');
707 let resolved = workdir.join(clean);
708 if resolved.starts_with(workdir) {
710 resolved
711 } else {
712 workdir.join(clean.replace("..", "_"))
713 }
714}