use std::collections::HashMap;
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tracing;
use uuid::Uuid;
use ciab_core::error::{CiabError, CiabResult};
use ciab_core::traits::runtime::SandboxRuntime;
use ciab_core::types::sandbox::{
ExecRequest, ExecResult, FileInfo, LogOptions, ResourceStats, SandboxInfo, SandboxPersistence,
SandboxSpec, SandboxState,
};
use crate::client::{CreateSandboxRequest, OpenSandboxClient};
use crate::execd::ExecdClient;
pub struct OpenSandboxRuntime {
client: OpenSandboxClient,
execd_clients: DashMap<String, ExecdClient>,
}
impl OpenSandboxRuntime {
pub fn new(opensandbox_url: String, api_key: Option<String>) -> Self {
Self {
client: OpenSandboxClient::new(opensandbox_url, api_key),
execd_clients: DashMap::new(),
}
}
fn get_or_create_execd(&self, sandbox_id: &str, endpoint_url: &str) -> ExecdClient {
if let Some(client) = self.execd_clients.get(sandbox_id) {
return client.clone();
}
let client = ExecdClient::new(endpoint_url.to_string());
self.execd_clients
.insert(sandbox_id.to_string(), client.clone());
client
}
async fn resolve_execd(&self, id: &Uuid) -> CiabResult<ExecdClient> {
let sandbox_id = id.to_string();
if let Some(client) = self.execd_clients.get(&sandbox_id) {
return Ok(client.clone());
}
let info = self.client.get_sandbox(&sandbox_id).await?;
let endpoint_url = info.endpoint_url.ok_or_else(|| {
CiabError::OpenSandboxError(format!("sandbox {} has no endpoint URL", sandbox_id))
})?;
Ok(self.get_or_create_execd(&sandbox_id, &endpoint_url))
}
fn map_status_to_state(status: &str) -> SandboxState {
match status {
"running" => SandboxState::Running,
"paused" => SandboxState::Paused,
"stopped" => SandboxState::Stopped,
"creating" => SandboxState::Creating,
"pending" => SandboxState::Pending,
"failed" => SandboxState::Failed,
"terminated" => SandboxState::Terminated,
"pausing" => SandboxState::Pausing,
"stopping" => SandboxState::Stopping,
_ => SandboxState::Running,
}
}
}
#[async_trait]
impl SandboxRuntime for OpenSandboxRuntime {
async fn create_sandbox(&self, spec: &SandboxSpec) -> CiabResult<SandboxInfo> {
let request = CreateSandboxRequest {
image: spec
.image
.clone()
.unwrap_or_else(|| "ubuntu:latest".to_string()),
cpu: spec.resource_limits.as_ref().map(|r| r.cpu_cores),
memory_mb: spec.resource_limits.as_ref().map(|r| r.memory_mb),
disk_mb: spec.resource_limits.as_ref().map(|r| r.disk_mb),
env: spec.env_vars.clone(),
ports: spec.ports.clone(),
timeout_secs: spec.timeout_secs.map(|t| t as u64),
labels: spec.labels.clone(),
};
let resp = self.client.create_sandbox(&request).await?;
if let Some(ref url) = resp.endpoint_url {
self.get_or_create_execd(&resp.id, url);
}
let sandbox_id = Uuid::parse_str(&resp.id).unwrap_or_else(|_| Uuid::new_v4());
let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
Ok(SandboxInfo {
id: sandbox_id,
name: spec.name.clone(),
state: Self::map_status_to_state(&resp.status),
persistence: spec.persistence.clone(),
agent_provider: spec.agent_provider.clone(),
endpoint_url: resp.endpoint_url,
resource_stats: None,
labels: resp.labels,
created_at,
updated_at: created_at,
spec: spec.clone(),
})
}
async fn get_sandbox(&self, id: &Uuid) -> CiabResult<SandboxInfo> {
let resp = self.client.get_sandbox(&id.to_string()).await?;
let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
Ok(SandboxInfo {
id: *id,
name: None,
state: Self::map_status_to_state(&resp.status),
persistence: SandboxPersistence::Ephemeral,
agent_provider: "opensandbox".to_string(),
endpoint_url: resp.endpoint_url,
resource_stats: None,
labels: resp.labels,
created_at,
updated_at: created_at,
spec: SandboxSpec {
name: None,
agent_provider: "opensandbox".to_string(),
image: None,
resource_limits: None,
persistence: SandboxPersistence::Ephemeral,
network: None,
env_vars: HashMap::new(),
volumes: vec![],
ports: vec![],
git_repos: vec![],
credentials: vec![],
provisioning_scripts: vec![],
labels: HashMap::new(),
timeout_secs: None,
agent_config: None,
local_mounts: vec![],
runtime_backend: None,
},
})
}
async fn list_sandboxes(
&self,
state: Option<SandboxState>,
provider: Option<&str>,
labels: &HashMap<String, String>,
) -> CiabResult<Vec<SandboxInfo>> {
let sandboxes = self.client.list_sandboxes().await?;
let mut results: Vec<SandboxInfo> = sandboxes
.into_iter()
.map(|resp| {
let sandbox_id = Uuid::parse_str(&resp.id).unwrap_or_else(|_| Uuid::new_v4());
let created_at = chrono::DateTime::parse_from_rfc3339(&resp.created_at)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
SandboxInfo {
id: sandbox_id,
name: None,
state: Self::map_status_to_state(&resp.status),
persistence: SandboxPersistence::Ephemeral,
agent_provider: "opensandbox".to_string(),
endpoint_url: resp.endpoint_url,
resource_stats: None,
labels: resp.labels,
created_at,
updated_at: created_at,
spec: SandboxSpec {
name: None,
agent_provider: "opensandbox".to_string(),
image: None,
resource_limits: None,
persistence: SandboxPersistence::Ephemeral,
network: None,
env_vars: HashMap::new(),
volumes: vec![],
ports: vec![],
git_repos: vec![],
credentials: vec![],
provisioning_scripts: vec![],
labels: HashMap::new(),
timeout_secs: None,
agent_config: None,
local_mounts: vec![],
runtime_backend: None,
},
}
})
.collect();
if let Some(ref filter_state) = state {
results.retain(|s| &s.state == filter_state);
}
if let Some(filter_provider) = provider {
results.retain(|s| s.agent_provider == filter_provider);
}
if !labels.is_empty() {
results.retain(|s| {
labels
.iter()
.all(|(k, v)| s.labels.get(k).map(|sv| sv == v).unwrap_or(false))
});
}
Ok(results)
}
async fn start_sandbox(&self, id: &Uuid) -> CiabResult<()> {
tracing::debug!("start_sandbox called for {}, attempting resume", id);
self.client.resume_sandbox(&id.to_string()).await
}
async fn stop_sandbox(&self, id: &Uuid) -> CiabResult<()> {
tracing::debug!("stop_sandbox called for {}, mapping to pause", id);
self.client.pause_sandbox(&id.to_string()).await
}
async fn pause_sandbox(&self, id: &Uuid) -> CiabResult<()> {
self.client.pause_sandbox(&id.to_string()).await
}
async fn resume_sandbox(&self, id: &Uuid) -> CiabResult<()> {
self.client.resume_sandbox(&id.to_string()).await
}
async fn terminate_sandbox(&self, id: &Uuid) -> CiabResult<()> {
let sandbox_id = id.to_string();
self.client.delete_sandbox(&sandbox_id).await?;
self.execd_clients.remove(&sandbox_id);
Ok(())
}
async fn exec(&self, id: &Uuid, request: &ExecRequest) -> CiabResult<ExecResult> {
let execd = self.resolve_execd(id).await?;
execd.run_command(request).await
}
async fn read_file(&self, id: &Uuid, path: &str) -> CiabResult<Vec<u8>> {
let execd = self.resolve_execd(id).await?;
execd.download_file(path).await
}
async fn write_file(&self, id: &Uuid, path: &str, content: &[u8]) -> CiabResult<()> {
let execd = self.resolve_execd(id).await?;
execd.upload_file(path, content, 0o644).await
}
async fn list_files(&self, id: &Uuid, path: &str) -> CiabResult<Vec<FileInfo>> {
let execd = self.resolve_execd(id).await?;
execd.list_files(path).await
}
async fn get_stats(&self, id: &Uuid) -> CiabResult<ResourceStats> {
let execd = self.resolve_execd(id).await?;
execd.get_metrics().await
}
async fn stream_logs(
&self,
id: &Uuid,
options: &LogOptions,
) -> CiabResult<mpsc::Receiver<String>> {
let (tx, rx) = mpsc::channel(256);
let execd = self.resolve_execd(id).await?;
let mut cmd = vec!["tail".to_string()];
if options.follow {
cmd.push("-f".to_string());
}
if let Some(tail_lines) = options.tail {
cmd.push("-n".to_string());
cmd.push(tail_lines.to_string());
}
cmd.push("/var/log/syslog".to_string());
let request = ExecRequest {
command: cmd,
workdir: None,
env: HashMap::new(),
stdin: None,
timeout_secs: None,
tty: false,
};
let sandbox_id = *id;
tokio::spawn(async move {
let (stream_tx, mut stream_rx) =
mpsc::channel::<ciab_core::types::stream::StreamEvent>(256);
let stream_handle = {
let execd = execd.clone();
let request = request.clone();
tokio::spawn(async move {
let _ = execd
.run_command_stream(&request, stream_tx, sandbox_id)
.await;
})
};
while let Some(event) = stream_rx.recv().await {
if let Some(text) = event.data.as_str() {
if tx.send(text.to_string()).await.is_err() {
break;
}
} else {
let text = event.data.to_string();
if tx.send(text).await.is_err() {
break;
}
}
}
let _ = stream_handle.await;
});
Ok(rx)
}
}