use std::sync::Arc;
use sea_orm::EntityTrait;
use crate::{
MicrosandboxResult,
backend::{
Backend, CloudSandbox, SandboxHandleCloudState, SandboxHandleInner, SandboxHandleLocalState,
},
db::entity::sandbox as sandbox_entity,
};
use super::{Sandbox, SandboxConfig, SandboxStatus, SandboxStopResult};
pub const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
pub const DEFAULT_STOP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
pub const DEFAULT_KILL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
pub struct SandboxHandle {
backend: Arc<dyn Backend>,
inner: SandboxHandleInner,
name: String,
}
impl SandboxHandle {
pub(crate) fn from_local_model(
backend: Arc<dyn Backend>,
model: sandbox_entity::Model,
pid: Option<i32>,
) -> Self {
let name = model.name.clone();
Self {
backend,
inner: SandboxHandleInner::Local(SandboxHandleLocalState {
db_id: model.id,
status: model.status,
config_json: model.config,
created_at: model.created_at.map(|dt| dt.and_utc()),
updated_at: model.updated_at.map(|dt| dt.and_utc()),
pid,
}),
name,
}
}
pub(crate) fn from_cloud(
backend: Arc<dyn Backend>,
cloud: CloudSandbox,
) -> MicrosandboxResult<Self> {
let status = crate::backend::sandbox::cloud_status_to_sandbox_status(cloud.status);
let config_json = serde_json::to_string(&cloud.config)?;
let name = cloud.name.clone();
Ok(Self {
backend,
inner: SandboxHandleInner::Cloud(SandboxHandleCloudState {
id: cloud.id,
org_id: cloud.org_id,
status,
config_json,
created_at: Some(cloud.created_at),
started_at: cloud.started_at,
stopped_at: cloud.stopped_at,
last_error: cloud.last_error,
}),
name,
})
}
pub fn name(&self) -> &str {
&self.name
}
pub fn backend_kind(&self) -> crate::backend::BackendKind {
self.backend.kind()
}
pub fn local(&self) -> Option<&SandboxHandleLocalState> {
match &self.inner {
SandboxHandleInner::Local(s) => Some(s),
SandboxHandleInner::Cloud(_) => None,
}
}
pub fn cloud(&self) -> Option<&SandboxHandleCloudState> {
match &self.inner {
SandboxHandleInner::Cloud(s) => Some(s),
SandboxHandleInner::Local(_) => None,
}
}
pub fn status_snapshot(&self) -> SandboxStatus {
match &self.inner {
SandboxHandleInner::Local(s) => s.status,
SandboxHandleInner::Cloud(s) => s.status,
}
}
pub fn last_error_snapshot(&self) -> Option<String> {
match &self.inner {
SandboxHandleInner::Cloud(s) => s.last_error.clone(),
SandboxHandleInner::Local(_) => None,
}
}
pub fn config_json(&self) -> &str {
match &self.inner {
SandboxHandleInner::Local(s) => &s.config_json,
SandboxHandleInner::Cloud(s) => &s.config_json,
}
}
pub fn config(&self) -> MicrosandboxResult<SandboxConfig> {
match &self.inner {
SandboxHandleInner::Local(s) => Ok(serde_json::from_str(&s.config_json)?),
SandboxHandleInner::Cloud(_) => Err(crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::config on cloud".into(),
available_when: "when SandboxConfig is the cloud wire shape".into(),
}),
}
}
pub async fn refresh(&self) -> MicrosandboxResult<SandboxHandle> {
self.backend
.sandboxes()
.get(self.backend.clone(), &self.name)
.await
}
pub fn created_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
match &self.inner {
SandboxHandleInner::Local(s) => s.created_at,
SandboxHandleInner::Cloud(s) => s.created_at,
}
}
pub fn updated_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
match &self.inner {
SandboxHandleInner::Local(s) => s.updated_at,
SandboxHandleInner::Cloud(s) => s.stopped_at.or(s.started_at).or(s.created_at),
}
}
pub async fn logs(
&self,
opts: &crate::logs::LogOptions,
) -> MicrosandboxResult<Vec<crate::logs::LogEntry>> {
self.backend
.sandboxes()
.logs(self.backend.clone(), &self.name, opts)
.await
}
pub async fn log_stream(
&self,
opts: &crate::logs::LogStreamOptions,
) -> MicrosandboxResult<crate::backend::sandbox::LogStream> {
self.backend
.sandboxes()
.log_stream(self.backend.clone(), &self.name, opts)
.await
}
pub async fn metrics(&self) -> MicrosandboxResult<super::SandboxMetrics> {
let local = self
.local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::metrics on cloud".into(),
available_when: "when cloud metrics land".into(),
})?;
if local.status != SandboxStatus::Running && local.status != SandboxStatus::Draining {
return Err(crate::MicrosandboxError::Custom(format!(
"sandbox '{}' is not running (status: {:?})",
self.name, local.status
)));
}
let config = self.config()?;
if config.effective_metrics_interval().is_none() {
return Err(crate::MicrosandboxError::MetricsDisabled(self.name.clone()));
}
let local_backend =
self.backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::metrics on cloud".into(),
available_when: "when cloud metrics land".into(),
})?;
let db = local_backend.db().await?.read();
super::metrics::metrics_for_sandbox(db, local_backend, local.db_id, &config).await
}
pub async fn start(&self) -> MicrosandboxResult<Sandbox> {
self.backend
.sandboxes()
.start(self.backend.clone(), &self.name)
.await
}
pub async fn start_detached(&self) -> MicrosandboxResult<Sandbox> {
self.backend
.sandboxes()
.start_detached(self.backend.clone(), &self.name)
.await
}
pub async fn connect(&self) -> MicrosandboxResult<Sandbox> {
self.connect_with_timeout(DEFAULT_CONNECT_TIMEOUT).await
}
pub async fn connect_with_timeout(
&self,
timeout: std::time::Duration,
) -> MicrosandboxResult<Sandbox> {
let local = self
.local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::connect on cloud".into(),
available_when: "when cloud attach lands".into(),
})?;
if local.status != SandboxStatus::Running && local.status != SandboxStatus::Draining {
return Err(crate::MicrosandboxError::Custom(format!(
"sandbox '{}' is not running (status: {:?})",
self.name, local.status
)));
}
let local_backend =
self.backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::connect on cloud".into(),
available_when: "when cloud attach lands".into(),
})?;
let client = crate::sandbox::fs::local::connect_agent_with_timeout(
local_backend,
&self.name,
timeout,
)
.await?;
let config: SandboxConfig = serde_json::from_str(&local.config_json)?;
Ok(Sandbox::from_local(
self.backend.clone(),
crate::backend::SandboxLocalState {
db_id: local.db_id,
handle: None,
client: Arc::new(client),
},
config,
))
}
pub async fn snapshot(
&self,
name: &str,
) -> MicrosandboxResult<super::super::snapshot::Snapshot> {
if self.local().is_none() {
return Err(crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::snapshot on cloud".into(),
available_when: "when cloud snapshots land".into(),
});
}
use super::super::snapshot::{Snapshot, SnapshotDestination};
Snapshot::builder(&self.name)
.destination(SnapshotDestination::Name(name.to_string()))
.create()
.await
}
pub async fn snapshot_to(
&self,
path: impl AsRef<std::path::Path>,
) -> MicrosandboxResult<super::super::snapshot::Snapshot> {
if self.local().is_none() {
return Err(crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::snapshot_to on cloud".into(),
available_when: "when cloud snapshots land".into(),
});
}
use super::super::snapshot::{Snapshot, SnapshotDestination};
Snapshot::builder(&self.name)
.destination(SnapshotDestination::Path(path.as_ref().to_path_buf()))
.create()
.await
}
pub async fn stop(&self) -> MicrosandboxResult<()> {
self.stop_with_timeout(DEFAULT_STOP_TIMEOUT).await
}
pub async fn stop_with_timeout(&self, timeout: std::time::Duration) -> MicrosandboxResult<()> {
let current = self.refresh().await?;
if sandbox_status_is_terminal(current.status_snapshot()) {
return Ok(());
}
if timeout.is_zero() {
current.kill_with_timeout(DEFAULT_KILL_TIMEOUT).await?;
return Ok(());
}
current.request_stop().await?;
match tokio::time::timeout(timeout, current.wait_until_stopped()).await {
Ok(Ok(_)) => return Ok(()),
Ok(Err(error)) => return Err(error),
Err(_) => {}
}
tracing::warn!(
sandbox = %current.name,
timeout_secs = timeout.as_secs(),
"graceful stop exceeded timeout, escalating to kill"
);
current.request_kill().await?;
match tokio::time::timeout(DEFAULT_KILL_TIMEOUT, current.wait_until_stopped()).await {
Ok(result) => {
result?;
Ok(())
}
Err(_) => Err(crate::MicrosandboxError::Runtime(format!(
"timed out observing stopped state for sandbox '{}'",
current.name
))),
}
}
pub async fn request_stop(&self) -> MicrosandboxResult<()> {
let current = self.refresh().await?;
if sandbox_status_is_terminal(current.status_snapshot()) {
return Ok(());
}
current
.backend
.sandboxes()
.stop(current.backend.clone(), ¤t.name)
.await
}
pub async fn kill(&self) -> MicrosandboxResult<()> {
self.kill_with_timeout(DEFAULT_KILL_TIMEOUT).await
}
pub async fn request_kill(&self) -> MicrosandboxResult<()> {
let current = self.refresh().await?;
if sandbox_status_is_terminal(current.status_snapshot()) {
return Ok(());
}
current
.backend
.sandboxes()
.kill(current.backend.clone(), ¤t.name)
.await
}
pub async fn kill_with_timeout(&self, timeout: std::time::Duration) -> MicrosandboxResult<()> {
let current = self.refresh().await?;
if sandbox_status_is_terminal(current.status_snapshot()) {
return Ok(());
}
current.request_kill().await?;
match tokio::time::timeout(timeout, current.wait_until_stopped()).await {
Ok(result) => {
result?;
Ok(())
}
Err(_) => Err(crate::MicrosandboxError::Runtime(format!(
"timed out observing stopped state for sandbox '{}'",
current.name
))),
}
}
pub async fn request_drain(&self) -> MicrosandboxResult<()> {
let current = self.refresh().await?;
if sandbox_status_is_terminal(current.status_snapshot()) {
return Ok(());
}
current
.backend
.sandboxes()
.drain(current.backend.clone(), ¤t.name)
.await
}
pub async fn wait_until_stopped(&self) -> MicrosandboxResult<SandboxStopResult> {
loop {
let current = self.refresh().await?;
let status = current.status_snapshot();
if sandbox_status_is_terminal(status) {
return Ok(SandboxStopResult {
name: current.name,
status,
exit_code: None,
signal: None,
observed_at: chrono::Utc::now(),
source: Some("refreshed backend state".to_string()),
});
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
pub async fn remove(&self) -> MicrosandboxResult<()> {
match &self.inner {
SandboxHandleInner::Local(_) => {
let refreshed = self.refresh().await?;
let local =
refreshed
.local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::remove on cloud".into(),
available_when: "wired via Cloud variant".into(),
})?;
if matches!(
local.status,
SandboxStatus::Running | SandboxStatus::Draining | SandboxStatus::Paused
) {
return Err(crate::MicrosandboxError::SandboxStillRunning(format!(
"cannot remove sandbox '{}': still running",
self.name
)));
}
let local_backend = self.backend.as_local().ok_or_else(|| {
crate::MicrosandboxError::Unsupported {
feature: "SandboxHandle::remove on cloud".into(),
available_when: "wired via Cloud variant".into(),
}
})?;
let pools = local_backend.db().await?;
super::remove_dir_if_exists(&local_backend.sandboxes_dir().join(&self.name))?;
sandbox_entity::Entity::delete_by_id(local.db_id)
.exec(pools.write())
.await?;
Ok(())
}
SandboxHandleInner::Cloud(_) => {
self.backend
.sandboxes()
.remove(self.backend.clone(), &self.name)
.await
}
}
}
}
fn sandbox_status_is_terminal(status: SandboxStatus) -> bool {
matches!(status, SandboxStatus::Stopped | SandboxStatus::Crashed)
}
impl std::fmt::Debug for SandboxHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SandboxHandle")
.field("name", &self.name)
.field("backend_kind", &self.backend.kind())
.field("status", &self.status_snapshot())
.finish()
}
}