1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use dashmap::DashMap;
5use tokio::sync::mpsc;
6use tracing;
7use uuid::Uuid;
8
9use ciab_core::error::{CiabError, CiabResult};
10use ciab_core::traits::runtime::SandboxRuntime;
11use ciab_core::types::sandbox::{
12 ExecRequest, ExecResult, FileInfo, LogOptions, ResourceStats, SandboxInfo, SandboxPersistence,
13 SandboxSpec, SandboxState,
14};
15
16use crate::client::{CreateSandboxRequest, OpenSandboxClient};
17use crate::execd::ExecdClient;
18
19pub struct OpenSandboxRuntime {
20 client: OpenSandboxClient,
21 execd_clients: DashMap<String, ExecdClient>,
22}
23
24impl OpenSandboxRuntime {
25 pub fn new(opensandbox_url: String, api_key: Option<String>) -> Self {
26 Self {
27 client: OpenSandboxClient::new(opensandbox_url, api_key),
28 execd_clients: DashMap::new(),
29 }
30 }
31
32 fn get_or_create_execd(&self, sandbox_id: &str, endpoint_url: &str) -> ExecdClient {
33 if let Some(client) = self.execd_clients.get(sandbox_id) {
34 return client.clone();
35 }
36 let client = ExecdClient::new(endpoint_url.to_string());
37 self.execd_clients
38 .insert(sandbox_id.to_string(), client.clone());
39 client
40 }
41
42 async fn resolve_execd(&self, id: &Uuid) -> CiabResult<ExecdClient> {
43 let sandbox_id = id.to_string();
44 if let Some(client) = self.execd_clients.get(&sandbox_id) {
45 return Ok(client.clone());
46 }
47 let info = self.client.get_sandbox(&sandbox_id).await?;
48 let endpoint_url = info.endpoint_url.ok_or_else(|| {
49 CiabError::OpenSandboxError(format!("sandbox {} has no endpoint URL", sandbox_id))
50 })?;
51 Ok(self.get_or_create_execd(&sandbox_id, &endpoint_url))
52 }
53
54 fn map_status_to_state(status: &str) -> SandboxState {
55 match status {
56 "running" => SandboxState::Running,
57 "paused" => SandboxState::Paused,
58 "stopped" => SandboxState::Stopped,
59 "creating" => SandboxState::Creating,
60 "pending" => SandboxState::Pending,
61 "failed" => SandboxState::Failed,
62 "terminated" => SandboxState::Terminated,
63 "pausing" => SandboxState::Pausing,
64 "stopping" => SandboxState::Stopping,
65 _ => SandboxState::Running,
66 }
67 }
68}
69
70#[async_trait]
71impl SandboxRuntime for OpenSandboxRuntime {
72 async fn create_sandbox(&self, spec: &SandboxSpec) -> CiabResult<SandboxInfo> {
73 let request = CreateSandboxRequest {
74 image: spec
75 .image
76 .clone()
77 .unwrap_or_else(|| "ubuntu:latest".to_string()),
78 cpu: spec.resource_limits.as_ref().map(|r| r.cpu_cores),
79 memory_mb: spec.resource_limits.as_ref().map(|r| r.memory_mb),
80 disk_mb: spec.resource_limits.as_ref().map(|r| r.disk_mb),
81 env: spec.env_vars.clone(),
82 ports: spec.ports.clone(),
83 timeout_secs: spec.timeout_secs.map(|t| t as u64),
84 labels: spec.labels.clone(),
85 };
86
87 let resp = self.client.create_sandbox(&request).await?;
88
89 if let Some(ref url) = resp.endpoint_url {
91 self.get_or_create_execd(&resp.id, url);
92 }
93
94 let sandbox_id = Uuid::parse_str(&resp.id).unwrap_or_else(|_| Uuid::new_v4());
95 let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
96 .map(|dt| dt.with_timezone(&chrono::Utc))
97 .unwrap_or_else(|_| chrono::Utc::now());
98
99 Ok(SandboxInfo {
100 id: sandbox_id,
101 name: spec.name.clone(),
102 state: Self::map_status_to_state(&resp.status),
103 persistence: spec.persistence.clone(),
104 agent_provider: spec.agent_provider.clone(),
105 endpoint_url: resp.endpoint_url,
106 resource_stats: None,
107 labels: resp.labels,
108 created_at,
109 updated_at: created_at,
110 spec: spec.clone(),
111 })
112 }
113
114 async fn get_sandbox(&self, id: &Uuid) -> CiabResult<SandboxInfo> {
115 let resp = self.client.get_sandbox(&id.to_string()).await?;
116 let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
117 .map(|dt| dt.with_timezone(&chrono::Utc))
118 .unwrap_or_else(|_| chrono::Utc::now());
119
120 Ok(SandboxInfo {
121 id: *id,
122 name: None,
123 state: Self::map_status_to_state(&resp.status),
124 persistence: SandboxPersistence::Ephemeral,
125 agent_provider: "opensandbox".to_string(),
126 endpoint_url: resp.endpoint_url,
127 resource_stats: None,
128 labels: resp.labels,
129 created_at,
130 updated_at: created_at,
131 spec: SandboxSpec {
132 name: None,
133 agent_provider: "opensandbox".to_string(),
134 image: None,
135 resource_limits: None,
136 persistence: SandboxPersistence::Ephemeral,
137 network: None,
138 env_vars: HashMap::new(),
139 volumes: vec![],
140 ports: vec![],
141 git_repos: vec![],
142 credentials: vec![],
143 provisioning_scripts: vec![],
144 labels: HashMap::new(),
145 timeout_secs: None,
146 agent_config: None,
147 local_mounts: vec![],
148 runtime_backend: None,
149 },
150 })
151 }
152
153 async fn list_sandboxes(
154 &self,
155 state: Option<SandboxState>,
156 provider: Option<&str>,
157 labels: &HashMap<String, String>,
158 ) -> CiabResult<Vec<SandboxInfo>> {
159 let sandboxes = self.client.list_sandboxes().await?;
160 let mut results: Vec<SandboxInfo> = sandboxes
161 .into_iter()
162 .map(|resp| {
163 let sandbox_id = Uuid::parse_str(&resp.id).unwrap_or_else(|_| Uuid::new_v4());
164 let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
165 .map(|dt| dt.with_timezone(&chrono::Utc))
166 .unwrap_or_else(|_| chrono::Utc::now());
167
168 SandboxInfo {
169 id: sandbox_id,
170 name: None,
171 state: Self::map_status_to_state(&resp.status),
172 persistence: SandboxPersistence::Ephemeral,
173 agent_provider: "opensandbox".to_string(),
174 endpoint_url: resp.endpoint_url,
175 resource_stats: None,
176 labels: resp.labels,
177 created_at,
178 updated_at: created_at,
179 spec: SandboxSpec {
180 name: None,
181 agent_provider: "opensandbox".to_string(),
182 image: None,
183 resource_limits: None,
184 persistence: SandboxPersistence::Ephemeral,
185 network: None,
186 env_vars: HashMap::new(),
187 volumes: vec![],
188 ports: vec![],
189 git_repos: vec![],
190 credentials: vec![],
191 provisioning_scripts: vec![],
192 labels: HashMap::new(),
193 timeout_secs: None,
194 agent_config: None,
195 local_mounts: vec![],
196 runtime_backend: None,
197 },
198 }
199 })
200 .collect();
201
202 if let Some(ref filter_state) = state {
204 results.retain(|s| &s.state == filter_state);
205 }
206 if let Some(filter_provider) = provider {
207 results.retain(|s| s.agent_provider == filter_provider);
208 }
209 if !labels.is_empty() {
210 results.retain(|s| {
211 labels
212 .iter()
213 .all(|(k, v)| s.labels.get(k).map(|sv| sv == v).unwrap_or(false))
214 });
215 }
216
217 Ok(results)
218 }
219
220 async fn start_sandbox(&self, id: &Uuid) -> CiabResult<()> {
221 tracing::debug!("start_sandbox called for {}, attempting resume", id);
223 self.client.resume_sandbox(&id.to_string()).await
224 }
225
226 async fn stop_sandbox(&self, id: &Uuid) -> CiabResult<()> {
227 tracing::debug!("stop_sandbox called for {}, mapping to pause", id);
229 self.client.pause_sandbox(&id.to_string()).await
230 }
231
232 async fn pause_sandbox(&self, id: &Uuid) -> CiabResult<()> {
233 self.client.pause_sandbox(&id.to_string()).await
234 }
235
236 async fn resume_sandbox(&self, id: &Uuid) -> CiabResult<()> {
237 self.client.resume_sandbox(&id.to_string()).await
238 }
239
240 async fn terminate_sandbox(&self, id: &Uuid) -> CiabResult<()> {
241 let sandbox_id = id.to_string();
242 self.client.delete_sandbox(&sandbox_id).await?;
243 self.execd_clients.remove(&sandbox_id);
244 Ok(())
245 }
246
247 async fn exec(&self, id: &Uuid, request: &ExecRequest) -> CiabResult<ExecResult> {
248 let execd = self.resolve_execd(id).await?;
249 execd.run_command(request).await
250 }
251
252 async fn read_file(&self, id: &Uuid, path: &str) -> CiabResult<Vec<u8>> {
253 let execd = self.resolve_execd(id).await?;
254 execd.download_file(path).await
255 }
256
257 async fn write_file(&self, id: &Uuid, path: &str, content: &[u8]) -> CiabResult<()> {
258 let execd = self.resolve_execd(id).await?;
259 execd.upload_file(path, content, 0o644).await
260 }
261
262 async fn list_files(&self, id: &Uuid, path: &str) -> CiabResult<Vec<FileInfo>> {
263 let execd = self.resolve_execd(id).await?;
264 execd.list_files(path).await
265 }
266
267 async fn get_stats(&self, id: &Uuid) -> CiabResult<ResourceStats> {
268 let execd = self.resolve_execd(id).await?;
269 execd.get_metrics().await
270 }
271
272 async fn stream_logs(
273 &self,
274 id: &Uuid,
275 options: &LogOptions,
276 ) -> CiabResult<mpsc::Receiver<String>> {
277 let (tx, rx) = mpsc::channel(256);
278 let execd = self.resolve_execd(id).await?;
279
280 let mut cmd = vec!["tail".to_string()];
281 if options.follow {
282 cmd.push("-f".to_string());
283 }
284 if let Some(tail_lines) = options.tail {
285 cmd.push("-n".to_string());
286 cmd.push(tail_lines.to_string());
287 }
288 cmd.push("/var/log/syslog".to_string());
289
290 let request = ExecRequest {
291 command: cmd,
292 workdir: None,
293 env: HashMap::new(),
294 stdin: None,
295 timeout_secs: None,
296 tty: false,
297 };
298
299 let sandbox_id = *id;
300 tokio::spawn(async move {
301 let (stream_tx, mut stream_rx) =
302 mpsc::channel::<ciab_core::types::stream::StreamEvent>(256);
303
304 let stream_handle = {
305 let execd = execd.clone();
306 let request = request.clone();
307 tokio::spawn(async move {
308 let _ = execd
309 .run_command_stream(&request, stream_tx, sandbox_id)
310 .await;
311 })
312 };
313
314 while let Some(event) = stream_rx.recv().await {
315 if let Some(text) = event.data.as_str() {
316 if tx.send(text.to_string()).await.is_err() {
317 break;
318 }
319 } else {
320 let text = event.data.to_string();
321 if tx.send(text).await.is_err() {
322 break;
323 }
324 }
325 }
326
327 let _ = stream_handle.await;
328 });
329
330 Ok(rx)
331 }
332}