Skip to main content

ciab_sandbox/
runtime.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use dashmap::DashMap;
5use tokio::sync::mpsc;
6use tracing;
7use uuid::Uuid;
8
9use ciab_core::error::{CiabError, CiabResult};
10use ciab_core::traits::runtime::SandboxRuntime;
11use ciab_core::types::sandbox::{
12    ExecRequest, ExecResult, FileInfo, LogOptions, ResourceStats, SandboxInfo, SandboxPersistence,
13    SandboxSpec, SandboxState,
14};
15
16use crate::client::{CreateSandboxRequest, OpenSandboxClient};
17use crate::execd::ExecdClient;
18
19pub struct OpenSandboxRuntime {
20    client: OpenSandboxClient,
21    execd_clients: DashMap<String, ExecdClient>,
22}
23
24impl OpenSandboxRuntime {
25    pub fn new(opensandbox_url: String, api_key: Option<String>) -> Self {
26        Self {
27            client: OpenSandboxClient::new(opensandbox_url, api_key),
28            execd_clients: DashMap::new(),
29        }
30    }
31
32    fn get_or_create_execd(&self, sandbox_id: &str, endpoint_url: &str) -> ExecdClient {
33        if let Some(client) = self.execd_clients.get(sandbox_id) {
34            return client.clone();
35        }
36        let client = ExecdClient::new(endpoint_url.to_string());
37        self.execd_clients
38            .insert(sandbox_id.to_string(), client.clone());
39        client
40    }
41
42    async fn resolve_execd(&self, id: &Uuid) -> CiabResult<ExecdClient> {
43        let sandbox_id = id.to_string();
44        if let Some(client) = self.execd_clients.get(&sandbox_id) {
45            return Ok(client.clone());
46        }
47        let info = self.client.get_sandbox(&sandbox_id).await?;
48        let endpoint_url = info.endpoint_url.ok_or_else(|| {
49            CiabError::OpenSandboxError(format!("sandbox {} has no endpoint URL", sandbox_id))
50        })?;
51        Ok(self.get_or_create_execd(&sandbox_id, &endpoint_url))
52    }
53
54    fn map_status_to_state(status: &str) -> SandboxState {
55        match status {
56            "running" => SandboxState::Running,
57            "paused" => SandboxState::Paused,
58            "stopped" => SandboxState::Stopped,
59            "creating" => SandboxState::Creating,
60            "pending" => SandboxState::Pending,
61            "failed" => SandboxState::Failed,
62            "terminated" => SandboxState::Terminated,
63            "pausing" => SandboxState::Pausing,
64            "stopping" => SandboxState::Stopping,
65            _ => SandboxState::Running,
66        }
67    }
68}
69
70#[async_trait]
71impl SandboxRuntime for OpenSandboxRuntime {
72    async fn create_sandbox(&self, spec: &SandboxSpec) -> CiabResult<SandboxInfo> {
73        let request = CreateSandboxRequest {
74            image: spec
75                .image
76                .clone()
77                .unwrap_or_else(|| "ubuntu:latest".to_string()),
78            cpu: spec.resource_limits.as_ref().map(|r| r.cpu_cores),
79            memory_mb: spec.resource_limits.as_ref().map(|r| r.memory_mb),
80            disk_mb: spec.resource_limits.as_ref().map(|r| r.disk_mb),
81            env: spec.env_vars.clone(),
82            ports: spec.ports.clone(),
83            timeout_secs: spec.timeout_secs.map(|t| t as u64),
84            labels: spec.labels.clone(),
85        };
86
87        let resp = self.client.create_sandbox(&request).await?;
88
89        // Cache execd client if endpoint available
90        if let Some(ref url) = resp.endpoint_url {
91            self.get_or_create_execd(&resp.id, url);
92        }
93
94        let sandbox_id = Uuid::parse_str(&resp.id).unwrap_or_else(|_| Uuid::new_v4());
95        let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
96            .map(|dt| dt.with_timezone(&chrono::Utc))
97            .unwrap_or_else(|_| chrono::Utc::now());
98
99        Ok(SandboxInfo {
100            id: sandbox_id,
101            name: spec.name.clone(),
102            state: Self::map_status_to_state(&resp.status),
103            persistence: spec.persistence.clone(),
104            agent_provider: spec.agent_provider.clone(),
105            endpoint_url: resp.endpoint_url,
106            resource_stats: None,
107            labels: resp.labels,
108            created_at,
109            updated_at: created_at,
110            spec: spec.clone(),
111        })
112    }
113
114    async fn get_sandbox(&self, id: &Uuid) -> CiabResult<SandboxInfo> {
115        let resp = self.client.get_sandbox(&id.to_string()).await?;
116        let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
117            .map(|dt| dt.with_timezone(&chrono::Utc))
118            .unwrap_or_else(|_| chrono::Utc::now());
119
120        Ok(SandboxInfo {
121            id: *id,
122            name: None,
123            state: Self::map_status_to_state(&resp.status),
124            persistence: SandboxPersistence::Ephemeral,
125            agent_provider: "opensandbox".to_string(),
126            endpoint_url: resp.endpoint_url,
127            resource_stats: None,
128            labels: resp.labels,
129            created_at,
130            updated_at: created_at,
131            spec: SandboxSpec {
132                name: None,
133                agent_provider: "opensandbox".to_string(),
134                image: None,
135                resource_limits: None,
136                persistence: SandboxPersistence::Ephemeral,
137                network: None,
138                env_vars: HashMap::new(),
139                volumes: vec![],
140                ports: vec![],
141                git_repos: vec![],
142                credentials: vec![],
143                provisioning_scripts: vec![],
144                labels: HashMap::new(),
145                timeout_secs: None,
146                agent_config: None,
147                local_mounts: vec![],
148                runtime_backend: None,
149            },
150        })
151    }
152
153    async fn list_sandboxes(
154        &self,
155        state: Option<SandboxState>,
156        provider: Option<&str>,
157        labels: &HashMap<String, String>,
158    ) -> CiabResult<Vec<SandboxInfo>> {
159        let sandboxes = self.client.list_sandboxes().await?;
160        let mut results: Vec<SandboxInfo> = sandboxes
161            .into_iter()
162            .map(|resp| {
163                let sandbox_id = Uuid::parse_str(&resp.id).unwrap_or_else(|_| Uuid::new_v4());
164                let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
165                    .map(|dt| dt.with_timezone(&chrono::Utc))
166                    .unwrap_or_else(|_| chrono::Utc::now());
167
168                SandboxInfo {
169                    id: sandbox_id,
170                    name: None,
171                    state: Self::map_status_to_state(&resp.status),
172                    persistence: SandboxPersistence::Ephemeral,
173                    agent_provider: "opensandbox".to_string(),
174                    endpoint_url: resp.endpoint_url,
175                    resource_stats: None,
176                    labels: resp.labels,
177                    created_at,
178                    updated_at: created_at,
179                    spec: SandboxSpec {
180                        name: None,
181                        agent_provider: "opensandbox".to_string(),
182                        image: None,
183                        resource_limits: None,
184                        persistence: SandboxPersistence::Ephemeral,
185                        network: None,
186                        env_vars: HashMap::new(),
187                        volumes: vec![],
188                        ports: vec![],
189                        git_repos: vec![],
190                        credentials: vec![],
191                        provisioning_scripts: vec![],
192                        labels: HashMap::new(),
193                        timeout_secs: None,
194                        agent_config: None,
195                        local_mounts: vec![],
196                        runtime_backend: None,
197                    },
198                }
199            })
200            .collect();
201
202        // Apply filters
203        if let Some(ref filter_state) = state {
204            results.retain(|s| &s.state == filter_state);
205        }
206        if let Some(filter_provider) = provider {
207            results.retain(|s| s.agent_provider == filter_provider);
208        }
209        if !labels.is_empty() {
210            results.retain(|s| {
211                labels
212                    .iter()
213                    .all(|(k, v)| s.labels.get(k).map(|sv| sv == v).unwrap_or(false))
214            });
215        }
216
217        Ok(results)
218    }
219
220    async fn start_sandbox(&self, id: &Uuid) -> CiabResult<()> {
221        // OpenSandbox auto-starts on create; resume if paused
222        tracing::debug!("start_sandbox called for {}, attempting resume", id);
223        self.client.resume_sandbox(&id.to_string()).await
224    }
225
226    async fn stop_sandbox(&self, id: &Uuid) -> CiabResult<()> {
227        // OpenSandbox doesn't have a stop concept; map to pause
228        tracing::debug!("stop_sandbox called for {}, mapping to pause", id);
229        self.client.pause_sandbox(&id.to_string()).await
230    }
231
232    async fn pause_sandbox(&self, id: &Uuid) -> CiabResult<()> {
233        self.client.pause_sandbox(&id.to_string()).await
234    }
235
236    async fn resume_sandbox(&self, id: &Uuid) -> CiabResult<()> {
237        self.client.resume_sandbox(&id.to_string()).await
238    }
239
240    async fn terminate_sandbox(&self, id: &Uuid) -> CiabResult<()> {
241        let sandbox_id = id.to_string();
242        self.client.delete_sandbox(&sandbox_id).await?;
243        self.execd_clients.remove(&sandbox_id);
244        Ok(())
245    }
246
247    async fn exec(&self, id: &Uuid, request: &ExecRequest) -> CiabResult<ExecResult> {
248        let execd = self.resolve_execd(id).await?;
249        execd.run_command(request).await
250    }
251
252    async fn read_file(&self, id: &Uuid, path: &str) -> CiabResult<Vec<u8>> {
253        let execd = self.resolve_execd(id).await?;
254        execd.download_file(path).await
255    }
256
257    async fn write_file(&self, id: &Uuid, path: &str, content: &[u8]) -> CiabResult<()> {
258        let execd = self.resolve_execd(id).await?;
259        execd.upload_file(path, content, 0o644).await
260    }
261
262    async fn list_files(&self, id: &Uuid, path: &str) -> CiabResult<Vec<FileInfo>> {
263        let execd = self.resolve_execd(id).await?;
264        execd.list_files(path).await
265    }
266
267    async fn get_stats(&self, id: &Uuid) -> CiabResult<ResourceStats> {
268        let execd = self.resolve_execd(id).await?;
269        execd.get_metrics().await
270    }
271
272    async fn stream_logs(
273        &self,
274        id: &Uuid,
275        options: &LogOptions,
276    ) -> CiabResult<mpsc::Receiver<String>> {
277        let (tx, rx) = mpsc::channel(256);
278        let execd = self.resolve_execd(id).await?;
279
280        let mut cmd = vec!["tail".to_string()];
281        if options.follow {
282            cmd.push("-f".to_string());
283        }
284        if let Some(tail_lines) = options.tail {
285            cmd.push("-n".to_string());
286            cmd.push(tail_lines.to_string());
287        }
288        cmd.push("/var/log/syslog".to_string());
289
290        let request = ExecRequest {
291            command: cmd,
292            workdir: None,
293            env: HashMap::new(),
294            stdin: None,
295            timeout_secs: None,
296            tty: false,
297        };
298
299        let sandbox_id = *id;
300        tokio::spawn(async move {
301            let (stream_tx, mut stream_rx) =
302                mpsc::channel::<ciab_core::types::stream::StreamEvent>(256);
303
304            let stream_handle = {
305                let execd = execd.clone();
306                let request = request.clone();
307                tokio::spawn(async move {
308                    let _ = execd
309                        .run_command_stream(&request, stream_tx, sandbox_id)
310                        .await;
311                })
312            };
313
314            while let Some(event) = stream_rx.recv().await {
315                if let Some(text) = event.data.as_str() {
316                    if tx.send(text.to_string()).await.is_err() {
317                        break;
318                    }
319                } else {
320                    let text = event.data.to_string();
321                    if tx.send(text).await.is_err() {
322                        break;
323                    }
324                }
325            }
326
327            let _ = stream_handle.await;
328        });
329
330        Ok(rx)
331    }
332}