pub(crate) mod attach;
mod builder;
mod config;
pub mod exec;
pub mod fs;
mod handle;
pub mod init;
pub(crate) mod metrics;
mod patch;
#[cfg(feature = "ssh")]
pub mod ssh;
mod types;
use std::{
collections::{BTreeMap, HashMap},
path::Path,
process::ExitStatus,
sync::Arc,
};
use microsandbox_db::pool::DbPools;
use microsandbox_db::{DbReadConnection, DbWriteConnection};
use microsandbox_image::Registry;
use microsandbox_protocol::{
exec::{ExecRequest, ExecRlimit},
message::MessageType,
};
use microsandbox_types::hostname_from_sandbox_name as derive_hostname;
use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QueryOrder, Set, sea_query::Expr,
};
use tokio::sync::Mutex;
use microsandbox_image::{
Digest, GlobalCache, PullOptions, PullProgressSender, PullResult, Reference, ext4,
progress_channel, tree,
};
use crate::{
MicrosandboxResult,
agent::AgentClient,
db::entity::{
run as run_entity, sandbox as sandbox_entity, sandbox_rootfs as sandbox_rootfs_entity,
},
runtime::{ProcessHandle, SpawnMode, spawn_sandbox},
};
use self::exec::{ExecHandle, ExecOptions};
pub(crate) const RESERVED_LABEL_PREFIXES: [&str; 3] = ["sandbox.", "microsandbox.", "service."];
const AGENT_RELAY_READY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180);
pub fn validate_sandbox_name(name: &str) -> MicrosandboxResult<()> {
microsandbox_types::validate_sandbox_name(name).map_err(Into::into)
}
pub(super) fn validate_sandbox_name_for_runtime(name: &str) -> MicrosandboxResult<()> {
validate_sandbox_name(name)?;
crate::runtime::resolve_sandbox_agent_socket_path(name).map(|_| ())
}
pub(super) fn validate_hostname(hostname: Option<&str>) -> MicrosandboxResult<()> {
microsandbox_types::validate_hostname(hostname).map_err(Into::into)
}
pub(crate) fn sandbox_name_validation_message(name: &str) -> Option<String> {
validate_sandbox_name(name).err().map(|err| err.to_string())
}
pub(crate) fn reserved_label_prefix(key: &str) -> Option<&'static str> {
RESERVED_LABEL_PREFIXES
.iter()
.copied()
.find(|prefix| key.starts_with(prefix))
}
pub use crate::db::entity::sandbox::SandboxStatus;
pub use crate::logs::{LogEntry, LogOptions, LogSource, LogStreamOptions};
pub use attach::AttachOptionsBuilder;
pub use builder::{RegistryConfigBuilder, SandboxBuilder};
pub use config::SandboxConfig;
pub use exec::{ExecOptionsBuilder, ExecOutput, Rlimit, RlimitResource};
pub use fs::{FsEntry, FsEntryKind, FsMetadata, FsReadStream, FsSetAttrs, FsWriteSink, SandboxFs};
pub use handle::SandboxHandle;
pub use init::{HandoffInit, InitOptionsBuilder};
pub use metrics::{SandboxMetrics, all_sandbox_metrics};
pub use microsandbox_image::{PullProgress, PullProgressHandle};
#[cfg(feature = "net")]
pub use microsandbox_network::builder::SecretBuilder;
#[cfg(feature = "net")]
pub use microsandbox_network::config::NetworkConfig;
#[cfg(feature = "net")]
pub use microsandbox_network::policy::NetworkPolicy;
pub use microsandbox_runtime::logging::LogLevel;
pub use microsandbox_types::PullPolicy;
pub use microsandbox_types::{
EnvVar, MAX_HOSTNAME_BYTES, MAX_SANDBOX_NAME_BYTES, NetworkSpec, PortProtocol,
PublishedPortSpec, SandboxLogLevel, SandboxResources, SandboxRuntimeOptions, SandboxSpec,
};
#[cfg(feature = "ssh")]
pub use ssh::{
DEFAULT_SSH_HOST, DEFAULT_SSH_PORT, SandboxSsh, SftpClient, SshAttachOptionsBuilder, SshClient,
SshClientOptionsBuilder, SshExecOptionsBuilder, SshOutput, SshServer, SshServerOptionsBuilder,
SshStdioStream,
};
pub use types::{
DiskImageFormat, HostPermissions, ImageBuilder, ImageSource, IntoImage, MountBuilder,
MountOptions, NamedVolumeMode, OciRootfsSource, Patch, PatchBuilder, RootfsSource,
SecurityProfile, StatVirtualization, VolumeMount,
};
pub(crate) struct RegistryOverrides {
pub auth: Option<microsandbox_image::RegistryAuth>,
pub insecure: bool,
pub ca_certs: Vec<Vec<u8>>,
}
#[derive(Debug, Default, Clone)]
pub struct SandboxFilter {
labels: Vec<(String, String)>,
}
#[derive(Clone)]
pub struct Sandbox {
backend: Arc<dyn crate::backend::Backend>,
inner: Arc<crate::backend::SandboxInner>,
name: String,
config: SandboxConfig,
}
#[derive(Debug, Clone)]
pub struct SandboxStopResult {
pub name: String,
pub status: SandboxStatus,
pub exit_code: Option<i32>,
pub signal: Option<i32>,
pub observed_at: chrono::DateTime<chrono::Utc>,
pub source: Option<String>,
}
impl Sandbox {
pub fn builder(name: impl Into<String>) -> SandboxBuilder {
SandboxBuilder::new(name)
}
pub async fn create(config: SandboxConfig) -> MicrosandboxResult<Self> {
let backend = crate::backend::default_backend();
backend
.sandboxes()
.create(backend.clone(), config, true)
.await
}
pub async fn create_detached(config: SandboxConfig) -> MicrosandboxResult<Self> {
let backend = crate::backend::default_backend();
backend
.sandboxes()
.create_detached(backend.clone(), config)
.await
}
pub fn create_with_pull_progress(
config: SandboxConfig,
) -> (
PullProgressHandle,
tokio::task::JoinHandle<MicrosandboxResult<Self>>,
) {
Self::create_with_pull_progress_and_mode(config, SpawnMode::Attached)
}
pub fn create_detached_with_pull_progress(
config: SandboxConfig,
) -> (
PullProgressHandle,
tokio::task::JoinHandle<MicrosandboxResult<Self>>,
) {
Self::create_with_pull_progress_and_mode(config, SpawnMode::Detached)
}
fn create_with_pull_progress_and_mode(
config: SandboxConfig,
mode: SpawnMode,
) -> (
PullProgressHandle,
tokio::task::JoinHandle<MicrosandboxResult<Self>>,
) {
let (handle, sender) = progress_channel();
let task = tokio::spawn(async move {
let backend = crate::backend::default_backend();
match backend.kind() {
crate::backend::BackendKind::Local => {
create_local(backend, config, mode, Some(sender)).await
}
crate::backend::BackendKind::Cloud => {
drop(sender); backend
.sandboxes()
.create(backend.clone(), config, true)
.await
}
}
});
(handle, task)
}
pub async fn start(name: &str) -> MicrosandboxResult<Self> {
let backend = crate::backend::default_backend();
backend.sandboxes().start(backend.clone(), name).await
}
pub async fn start_detached(name: &str) -> MicrosandboxResult<Self> {
let backend = crate::backend::default_backend();
backend
.sandboxes()
.start_detached(backend.clone(), name)
.await
}
pub async fn get(name: &str) -> MicrosandboxResult<SandboxHandle> {
let backend = crate::backend::default_backend();
backend.sandboxes().get(backend.clone(), name).await
}
pub async fn list() -> MicrosandboxResult<Vec<SandboxHandle>> {
let backend = crate::backend::default_backend();
let page = backend
.sandboxes()
.list(backend.clone(), None, None)
.await?;
Ok(page.sandboxes)
}
pub async fn list_with(filter: SandboxFilter) -> MicrosandboxResult<Vec<SandboxHandle>> {
let handles = Self::list().await?;
if filter.is_empty() {
return Ok(handles);
}
Ok(handles
.into_iter()
.filter(|handle| sandbox_handle_matches_filter(handle, &filter))
.collect())
}
pub async fn remove(name: &str) -> MicrosandboxResult<()> {
let backend = crate::backend::default_backend();
backend.sandboxes().remove(backend.clone(), name).await
}
}
impl SandboxFilter {
pub fn new() -> Self {
Self::default()
}
pub fn label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.labels.push((key.into(), value.into()));
self
}
pub fn labels(
mut self,
labels: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.labels.extend(
labels
.into_iter()
.map(|(key, value)| (key.into(), value.into())),
);
self
}
pub fn is_empty(&self) -> bool {
self.labels.is_empty()
}
}
impl Sandbox {
pub(crate) fn from_local(
backend: Arc<dyn crate::backend::Backend>,
local: crate::backend::SandboxLocalState,
config: SandboxConfig,
) -> Self {
Self {
backend,
inner: Arc::new(crate::backend::SandboxInner::Local(local)),
name: config.spec.name.clone(),
config,
}
}
pub(crate) fn from_cloud(
backend: Arc<dyn crate::backend::Backend>,
cloud: crate::backend::CloudSandbox,
config: SandboxConfig,
) -> Self {
Self {
backend,
inner: Arc::new(crate::backend::SandboxInner::Cloud(
crate::backend::SandboxCloudState {
id: cloud.id,
org_id: cloud.org_id,
created_at: cloud.created_at,
},
)),
name: cloud.name,
config,
}
}
}
pub(crate) async fn create_local(
backend: Arc<dyn crate::backend::Backend>,
mut config: SandboxConfig,
mode: SpawnMode,
progress: Option<PullProgressSender>,
) -> MicrosandboxResult<Sandbox> {
tracing::debug!(
sandbox = %config.spec.name,
image = ?config.spec.image,
mode = ?mode,
cpus = config.spec.resources.cpus,
memory_mib = config.spec.resources.memory_mib,
"create_local: starting"
);
let local_backend =
backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "create_local".into(),
available_when: "with a LocalBackend".into(),
})?;
config.apply_rootfs_defaults(local_backend.config().sandbox_defaults.oci.upper_size_mib);
let mut pinned_manifest_digest: Option<String> = None;
let mut pinned_reference: Option<String> = None;
config.apply_runtime_defaults();
validate_sandbox_name_for_runtime(&config.spec.name)?;
validate_hostname(config.spec.runtime.hostname.as_deref())?;
validate_rootfs_source(&config.spec.image)?;
validate_env(&config.spec.env)?;
validate_labels(&config.spec.labels)?;
if let Some(init) = &config.spec.init {
init::validate(init)?;
}
let db = local_backend.db().await?;
let sandbox_dir = local_backend.sandboxes_dir().join(&config.spec.name);
prepare_create_target(db, &config, &sandbox_dir).await?;
if let RootfsSource::Oci(oci) = config.spec.image.clone() {
let reference = oci.reference;
let expected_snapshot_manifest_digest = config
.snapshot_upper_source
.as_ref()
.and(config.manifest_digest.clone());
let upper_size_mib = oci
.upper_size_mib
.unwrap_or(config::DEFAULT_OCI_UPPER_SIZE_MIB);
let overrides = RegistryOverrides {
auth: config.registry_auth.clone(),
insecure: config.insecure,
ca_certs: config.ca_certs.clone(),
};
let pull_result = pull_oci_image(
local_backend,
&reference,
config.spec.pull_policy,
overrides,
progress,
)
.await?;
if let Some(expected) = expected_snapshot_manifest_digest.as_deref()
&& pull_result.manifest_digest.to_string() != expected
{
return Err(crate::MicrosandboxError::SnapshotIntegrity(format!(
"snapshot image digest mismatch: manifest pinned {}, resolved {}",
expected, pull_result.manifest_digest
)));
}
config.merge_image_defaults(&pull_result.config);
if let Some(init) = &config.spec.init {
init::validate(init)?;
}
pinned_manifest_digest = Some(pull_result.manifest_digest.to_string());
pinned_reference = Some(reference.clone());
let cache_dir = local_backend.cache_dir();
let cache = GlobalCache::new_async(&cache_dir).await?;
let vmdk_path = cache.vmdk_path(&pull_result.manifest_digest);
if tokio::fs::metadata(&vmdk_path).await.is_err() {
return Err(crate::MicrosandboxError::Custom(format!(
"VMDK not materialized: {}",
vmdk_path.display()
)));
}
let layer_erofs_paths: Vec<std::path::PathBuf> = pull_result
.layer_diff_ids
.iter()
.map(|d| cache.layer_erofs_path(d))
.collect();
let upper_tree = if !config.spec.patches.is_empty() {
Some(patch::build_upper_tree(&config.spec.patches, &layer_erofs_paths).await?)
} else {
None
};
tokio::fs::create_dir_all(&sandbox_dir).await?;
let upper_path = sandbox_dir.join("upper.ext4");
if let Some(snap_upper) = config.snapshot_upper_source.take() {
if upper_tree.is_some() {
return Err(crate::MicrosandboxError::InvalidConfig(
"patches cannot be combined with from_snapshot".into(),
));
}
let dst = upper_path.clone();
tokio::task::spawn_blocking(move || {
microsandbox_utils::copy::fast_copy(&snap_upper, &dst)
})
.await
.map_err(|e| crate::MicrosandboxError::Custom(format!("snapshot copy task: {e}")))??;
} else if !upper_path.exists() || upper_tree.is_some() {
create_upper_ext4(&upper_path, upper_size_mib, upper_tree).await?;
}
config.manifest_digest = Some(pull_result.manifest_digest.to_string());
if let Ok(image_ref) = reference.parse::<Reference>() {
match cache.read_image_metadata_async(&image_ref).await {
Ok(Some(metadata)) => {
if let Err(e) =
crate::image::Image::persist(local_backend, &reference, metadata).await
{
tracing::warn!(
error = %e,
"failed to persist image metadata to database"
);
}
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "failed to read cached image metadata");
}
}
}
}
if !config.spec.patches.is_empty() && !matches!(config.spec.image, RootfsSource::Oci(_)) {
patch::apply_patches(&config.spec.image, &config.spec.patches).await?;
}
let write_db = db.write();
let persisted_config = config.clone_for_persistence();
let sandbox_id = insert_sandbox_record(write_db, &persisted_config).await?;
tracing::debug!(sandbox_id, sandbox = %config.spec.name, "create_local: db record inserted");
let (local_state, returned_config) =
match create_inner_local(local_backend, config, sandbox_id, mode).await {
Ok(pair) => pair,
Err(e) => {
let _ = update_sandbox_status(write_db, sandbox_id, SandboxStatus::Stopped).await;
return Err(e);
}
};
let sandbox = Sandbox::from_local(backend.clone(), local_state, returned_config);
if let (Some(_reference), Some(manifest_digest)) = (
pinned_reference.as_deref(),
pinned_manifest_digest.as_deref(),
) && let Err(err) = persist_oci_manifest_pin(write_db, sandbox_id, manifest_digest).await
{
let _ = sandbox.stop().await;
let _ = update_sandbox_status(write_db, sandbox_id, SandboxStatus::Stopped).await;
return Err(err);
}
if let Some(ref workdir) = sandbox.config.spec.runtime.workdir
&& !sandbox.fs().exists(workdir).await.unwrap_or(false)
{
let _ = sandbox.stop().await;
let _ = update_sandbox_status(write_db, sandbox_id, SandboxStatus::Stopped).await;
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"workdir does not exist in guest: {workdir}"
)));
}
Ok(sandbox)
}
pub(crate) async fn start_local(
backend: Arc<dyn crate::backend::Backend>,
name: &str,
mode: SpawnMode,
) -> MicrosandboxResult<Sandbox> {
tracing::debug!(sandbox = name, ?mode, "start_local: loading record");
let local_backend =
backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "start_local".into(),
available_when: "with a LocalBackend".into(),
})?;
let pools = local_backend.db().await?;
let write_db = pools.write();
let model = load_sandbox_record_reconciled(pools, name).await?;
tracing::debug!(sandbox = name, status = ?model.status, "start_local: current status");
if model.status == SandboxStatus::Running || model.status == SandboxStatus::Draining {
return Err(crate::MicrosandboxError::SandboxStillRunning(format!(
"cannot start sandbox '{name}': already running"
)));
}
if model.status != SandboxStatus::Stopped && model.status != SandboxStatus::Crashed {
return Err(crate::MicrosandboxError::Custom(format!(
"cannot start sandbox '{name}': status is {:?} (expected Stopped or Crashed)",
model.status
)));
}
let mut config: SandboxConfig = serde_json::from_str(&model.config)?;
config.apply_runtime_defaults();
validate_sandbox_name_for_runtime(&config.spec.name)?;
validate_hostname(config.spec.runtime.hostname.as_deref())?;
validate_rootfs_source(&config.spec.image)?;
validate_env(&config.spec.env)?;
validate_labels(&config.spec.labels)?;
validate_start_state(
local_backend,
&config,
&local_backend.sandboxes_dir().join(name),
)?;
update_sandbox_status(write_db, model.id, SandboxStatus::Running).await?;
match create_inner_local(local_backend, config, model.id, mode).await {
Ok((local_state, returned_config)) => {
Ok(Sandbox::from_local(backend, local_state, returned_config))
}
Err(err) => {
let _ = update_sandbox_status(write_db, model.id, SandboxStatus::Stopped).await;
Err(err)
}
}
}
async fn create_inner_local(
local: &crate::backend::LocalBackend,
config: SandboxConfig,
sandbox_id: i32,
mode: SpawnMode,
) -> MicrosandboxResult<(crate::backend::SandboxLocalState, SandboxConfig)> {
let (mut handle, agent_sock_path) = spawn_sandbox(local, &config, sandbox_id, mode).await?;
let log_dir = local.sandboxes_dir().join(&config.spec.name).join("logs");
let client = wait_for_relay(&agent_sock_path, &log_dir, &mut handle, &config.spec.name).await?;
if let Ok(ready) = client.ready() {
tracing::info!(
boot_time_ms = ready.boot_time_ns / 1_000_000,
init_time_ms = ready.init_time_ns / 1_000_000,
ready_time_ms = ready.ready_time_ns / 1_000_000,
"sandbox ready",
);
}
let handle = if matches!(mode, SpawnMode::Detached) {
handle.disarm();
None
} else {
Some(Arc::new(Mutex::new(handle)))
};
Ok((
crate::backend::SandboxLocalState {
db_id: sandbox_id,
handle,
client: Arc::new(client),
},
config,
))
}
pub(crate) async fn get_local_handle_state(
local_backend: &crate::backend::LocalBackend,
name: &str,
) -> MicrosandboxResult<(sandbox_entity::Model, Option<i32>)> {
let pools = local_backend.db().await?;
let model = sandbox_entity::Entity::find()
.filter(sandbox_entity::Column::Name.eq(name))
.one(pools.read())
.await?
.ok_or_else(|| crate::MicrosandboxError::SandboxNotFound(name.into()))?;
let model = reconcile_sandbox_runtime_state(pools, model).await?;
let run = load_active_run(pools.read(), model.id).await?;
let pid = pid_from_run(run.as_ref());
Ok((model, pid))
}
pub(crate) async fn list_local_handle_state(
local_backend: &crate::backend::LocalBackend,
) -> MicrosandboxResult<Vec<(sandbox_entity::Model, Option<i32>)>> {
let pools = local_backend.db().await?;
let sandboxes = sandbox_entity::Entity::find()
.order_by_desc(sandbox_entity::Column::CreatedAt)
.all(pools.read())
.await?;
let mut reconciled = Vec::with_capacity(sandboxes.len());
for sandbox in sandboxes {
let model = reconcile_sandbox_runtime_state(pools, sandbox).await?;
reconciled.push(model);
}
let sandbox_ids: Vec<i32> = reconciled.iter().map(|sandbox| sandbox.id).collect();
let active_pids = load_active_pids(pools.read(), &sandbox_ids).await?;
let mut out = Vec::with_capacity(reconciled.len());
for sandbox in reconciled {
let pid = active_pids.get(&sandbox.id).copied();
out.push((sandbox, pid));
}
Ok(out)
}
pub(crate) async fn remove_local(
backend: Arc<dyn crate::backend::Backend>,
name: &str,
) -> MicrosandboxResult<()> {
let local_backend =
backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "remove_local".into(),
available_when: "with a LocalBackend".into(),
})?;
let (model, pid) = get_local_handle_state(local_backend, name).await?;
let handle = SandboxHandle::from_local_model(backend, model, pid);
handle.remove().await
}
pub(crate) async fn stop_local(
backend: Arc<dyn crate::backend::Backend>,
name: &str,
) -> MicrosandboxResult<()> {
let local_backend =
backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "stop_local".into(),
available_when: "with a LocalBackend".into(),
})?;
let (model, pid) = get_local_handle_state(local_backend, name).await?;
if model.status != SandboxStatus::Running && model.status != SandboxStatus::Draining {
return Ok(());
}
match fs::local::connect_agent_with_timeout(
local_backend,
name,
std::time::Duration::from_secs(5),
)
.await
{
Ok(client) => {
client.send(0, MessageType::Shutdown, &()).await?;
Ok(())
}
Err(e) => {
tracing::warn!(
sandbox = %name,
error = %e,
"stop_local: agent UDS unreachable; falling back to SIGTERM",
);
if let Some(pid) = pid.filter(|p| pid_is_alive(*p)) {
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGTERM,
)?;
}
Ok(())
}
}
}
pub(crate) async fn kill_local(
backend: Arc<dyn crate::backend::Backend>,
name: &str,
) -> MicrosandboxResult<()> {
let local_backend =
backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "kill_local".into(),
available_when: "with a LocalBackend".into(),
})?;
let (model, pid) = get_local_handle_state(local_backend, name).await?;
if model.status != SandboxStatus::Running && model.status != SandboxStatus::Draining {
return Ok(());
}
let mut pids = Vec::new();
if let Some(pid) = pid.filter(|p| pid_is_alive(*p)) {
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGKILL,
)?;
pids.push(pid);
}
if !pids.is_empty() {
let timeout = std::time::Duration::from_secs(5);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(50);
while start.elapsed() < timeout {
if pids.iter().all(|pid| pid_is_dead_or_reaped(*pid)) {
break;
}
tokio::time::sleep(poll_interval).await;
}
}
let all_dead = pids.is_empty() || pids.iter().all(|pid| pid_is_dead_or_reaped(*pid));
if all_dead {
let db = local_backend.db().await?.write();
if let Err(e) = update_sandbox_status(db, model.id, SandboxStatus::Stopped).await {
tracing::warn!(sandbox = %name, error = %e, "failed to update sandbox status after kill");
}
}
Ok(())
}
pub(crate) async fn drain_local(
backend: Arc<dyn crate::backend::Backend>,
name: &str,
) -> MicrosandboxResult<()> {
let local_backend =
backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "drain_local".into(),
available_when: "with a LocalBackend".into(),
})?;
let (_, pid) = get_local_handle_state(local_backend, name).await?;
if let Some(pid) = pid.filter(|p| pid_is_alive(*p)) {
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGUSR1,
)?;
}
Ok(())
}
impl Sandbox {
pub async fn remove_persisted(&self) -> MicrosandboxResult<()> {
let local = self.require_local("remove_persisted")?;
let local_backend =
self.backend
.as_local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: "Sandbox::remove_persisted on cloud".into(),
available_when: "never — cloud sandboxes are removed via the API".into(),
})?;
let pools = local_backend.db().await?;
remove_dir_if_exists(&local_backend.sandboxes_dir().join(&self.name))?;
sandbox_entity::Entity::delete_by_id(local.db_id)
.exec(pools.write())
.await?;
Ok(())
}
pub fn name(&self) -> &str {
&self.name
}
pub fn config(&self) -> &SandboxConfig {
&self.config
}
pub fn backend_kind(&self) -> crate::backend::BackendKind {
self.backend.kind()
}
pub fn backend(&self) -> &Arc<dyn crate::backend::Backend> {
&self.backend
}
pub fn local(&self) -> Option<&crate::backend::SandboxLocalState> {
match self.inner.as_ref() {
crate::backend::SandboxInner::Local(s) => Some(s),
crate::backend::SandboxInner::Cloud(_) => None,
}
}
pub fn cloud(&self) -> Option<&crate::backend::SandboxCloudState> {
match self.inner.as_ref() {
crate::backend::SandboxInner::Cloud(s) => Some(s),
crate::backend::SandboxInner::Local(_) => None,
}
}
fn require_local(
&self,
method: &'static str,
) -> MicrosandboxResult<&crate::backend::SandboxLocalState> {
self.local()
.ok_or_else(|| crate::MicrosandboxError::Unsupported {
feature: format!("Sandbox::{method}"),
available_when: "when cloud exec/fs/logs/metrics land".into(),
})
}
pub async fn status(&self) -> MicrosandboxResult<SandboxStatus> {
let handle = self
.backend
.sandboxes()
.get(self.backend.clone(), &self.name)
.await?;
Ok(handle.status_snapshot())
}
pub async fn last_error(&self) -> MicrosandboxResult<Option<String>> {
let handle = self
.backend
.sandboxes()
.get(self.backend.clone(), &self.name)
.await?;
Ok(handle.last_error_snapshot())
}
pub async fn logs(&self, opts: &LogOptions) -> MicrosandboxResult<Vec<LogEntry>> {
self.backend
.sandboxes()
.logs(self.backend.clone(), &self.name, opts)
.await
}
pub async fn log_stream(
&self,
opts: &LogStreamOptions,
) -> MicrosandboxResult<crate::backend::sandbox::LogStream> {
self.backend
.sandboxes()
.log_stream(self.backend.clone(), &self.name, opts)
.await
}
pub fn client(&self) -> &AgentClient {
match self.local() {
Some(local) => &local.client,
None => {
panic!("Sandbox::client called on cloud sandbox — use sb.local() to check first")
}
}
}
pub fn client_arc(&self) -> Arc<AgentClient> {
match self.local() {
Some(local) => Arc::clone(&local.client),
None => panic!(
"Sandbox::client_arc called on cloud sandbox — use sb.local() to check first"
),
}
}
pub fn owns_lifecycle(&self) -> bool {
self.local().map(|s| s.handle.is_some()).unwrap_or(false)
}
pub fn fs(&self) -> fs::SandboxFs<'_> {
fs::SandboxFs::new(self.backend.clone(), &self.name)
}
pub async fn stop(&self) -> MicrosandboxResult<()> {
tracing::debug!(sandbox = %self.name, "stop: dispatching");
self.backend
.sandboxes()
.stop(self.backend.clone(), &self.name)
.await
}
pub async fn stop_and_wait(&self) -> MicrosandboxResult<ExitStatus> {
let local = self.require_local("stop_and_wait")?;
let stop_result = self.stop().await;
if local.handle.is_none() {
stop_result?;
return Ok(std::process::ExitStatus::default());
}
let wait_result = self.wait().await;
stop_result?;
wait_result
}
pub async fn kill(&self) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.kill(self.backend.clone(), &self.name)
.await
}
pub async fn drain(&self) -> MicrosandboxResult<()> {
self.backend
.sandboxes()
.drain(self.backend.clone(), &self.name)
.await
}
pub async fn wait(&self) -> MicrosandboxResult<ExitStatus> {
let local = self.require_local("wait")?;
match &local.handle {
Some(h) => h.lock().await.wait().await,
None => Err(crate::MicrosandboxError::Runtime(
"cannot wait: not the lifecycle owner".into(),
)),
}
}
pub async fn detach(self) {
if let crate::backend::SandboxInner::Local(local) = self.inner.as_ref()
&& let Some(h) = &local.handle
{
h.lock().await.disarm();
}
}
}
impl Sandbox {
pub async fn exec_stream(
&self,
cmd: impl Into<String>,
args: impl IntoIterator<Item = impl Into<String>>,
) -> MicrosandboxResult<ExecHandle> {
let opts = ExecOptions {
args: args.into_iter().map(Into::into).collect(),
..Default::default()
};
self.backend
.sandboxes()
.exec_stream(
self.backend.clone(),
&self.name,
&self.config,
cmd.into(),
opts,
)
.await
}
pub async fn exec_stream_with(
&self,
cmd: impl Into<String>,
f: impl FnOnce(ExecOptionsBuilder) -> ExecOptionsBuilder,
) -> MicrosandboxResult<ExecHandle> {
let opts = f(ExecOptionsBuilder::default()).build()?;
self.backend
.sandboxes()
.exec_stream(
self.backend.clone(),
&self.name,
&self.config,
cmd.into(),
opts,
)
.await
}
pub async fn exec(
&self,
cmd: impl Into<String>,
args: impl IntoIterator<Item = impl Into<String>>,
) -> MicrosandboxResult<ExecOutput> {
let opts = ExecOptions {
args: args.into_iter().map(Into::into).collect(),
..Default::default()
};
self.backend
.sandboxes()
.exec(
self.backend.clone(),
&self.name,
&self.config,
cmd.into(),
opts,
)
.await
}
pub async fn exec_with(
&self,
cmd: impl Into<String>,
f: impl FnOnce(ExecOptionsBuilder) -> ExecOptionsBuilder,
) -> MicrosandboxResult<ExecOutput> {
let opts = f(ExecOptionsBuilder::default()).build()?;
self.backend
.sandboxes()
.exec(
self.backend.clone(),
&self.name,
&self.config,
cmd.into(),
opts,
)
.await
}
pub async fn shell(&self, script: impl Into<String>) -> MicrosandboxResult<ExecOutput> {
let shell = self
.config
.spec
.runtime
.shell
.as_deref()
.unwrap_or("/bin/sh")
.to_string();
let opts = ExecOptions {
args: vec!["-c".to_string(), script.into()],
..Default::default()
};
self.backend
.sandboxes()
.exec(self.backend.clone(), &self.name, &self.config, shell, opts)
.await
}
pub async fn shell_with(
&self,
script: impl Into<String>,
f: impl FnOnce(ExecOptionsBuilder) -> ExecOptionsBuilder,
) -> MicrosandboxResult<ExecOutput> {
let shell = self
.config
.spec
.runtime
.shell
.as_deref()
.unwrap_or("/bin/sh")
.to_string();
let mut opts = f(ExecOptionsBuilder::default()).build()?;
opts.args.splice(0..0, ["-c".to_string(), script.into()]);
self.backend
.sandboxes()
.exec(self.backend.clone(), &self.name, &self.config, shell, opts)
.await
}
pub async fn shell_stream(&self, script: impl Into<String>) -> MicrosandboxResult<ExecHandle> {
let shell = self
.config
.spec
.runtime
.shell
.as_deref()
.unwrap_or("/bin/sh")
.to_string();
let opts = ExecOptions {
args: vec!["-c".to_string(), script.into()],
..Default::default()
};
self.backend
.sandboxes()
.exec_stream(self.backend.clone(), &self.name, &self.config, shell, opts)
.await
}
pub async fn shell_stream_with(
&self,
script: impl Into<String>,
f: impl FnOnce(ExecOptionsBuilder) -> ExecOptionsBuilder,
) -> MicrosandboxResult<ExecHandle> {
let shell = self
.config
.spec
.runtime
.shell
.as_deref()
.unwrap_or("/bin/sh")
.to_string();
let mut opts = f(ExecOptionsBuilder::default()).build()?;
opts.args.splice(0..0, ["-c".to_string(), script.into()]);
self.backend
.sandboxes()
.exec_stream(self.backend.clone(), &self.name, &self.config, shell, opts)
.await
}
}
impl Sandbox {
pub async fn attach(
&self,
cmd: impl Into<String>,
args: impl IntoIterator<Item = impl Into<String>>,
) -> MicrosandboxResult<i32> {
let mut builder = AttachOptionsBuilder::default();
for arg in args {
builder = builder.arg(arg);
}
self.backend
.sandboxes()
.attach(
self.backend.clone(),
&self.name,
&self.config,
cmd.into(),
builder,
)
.await
}
pub async fn attach_with(
&self,
cmd: impl Into<String>,
f: impl FnOnce(AttachOptionsBuilder) -> AttachOptionsBuilder,
) -> MicrosandboxResult<i32> {
let builder = f(AttachOptionsBuilder::default());
self.backend
.sandboxes()
.attach(
self.backend.clone(),
&self.name,
&self.config,
cmd.into(),
builder,
)
.await
}
pub async fn attach_shell(&self) -> MicrosandboxResult<i32> {
let shell = self
.config
.spec
.runtime
.shell
.as_deref()
.unwrap_or("/bin/sh")
.to_string();
self.backend
.sandboxes()
.attach(
self.backend.clone(),
&self.name,
&self.config,
shell,
AttachOptionsBuilder::default(),
)
.await
}
}
async fn wait_for_relay(
sock_path: &std::path::Path,
log_dir: &std::path::Path,
handle: &mut ProcessHandle,
sandbox_name: &str,
) -> MicrosandboxResult<AgentClient> {
tracing::debug!(
sock = %sock_path.display(),
pid = handle.pid(),
"wait_for_relay: waiting for agent socket"
);
let deadline = tokio::time::Instant::now() + AGENT_RELAY_READY_TIMEOUT;
let max_backoff = std::time::Duration::from_millis(10);
let mut backoff = std::time::Duration::from_millis(1);
let mut attempts = 0u32;
loop {
attempts += 1;
match tokio::time::timeout(
deadline.saturating_duration_since(tokio::time::Instant::now()),
AgentClient::connect(sock_path),
)
.await
{
Ok(Ok(client)) => {
tracing::debug!(attempts, "wait_for_relay: connected");
let _ = microsandbox_runtime::boot_error::BootError::delete(log_dir);
return Ok(client);
}
Ok(Err(_)) | Err(_) if tokio::time::Instant::now() < deadline => {
if let Some(status) = handle.try_wait()? {
tracing::debug!(attempts, ?status, "wait_for_relay: sandbox process exited");
if let Some(boot_err) = read_boot_error(log_dir) {
return Err(crate::MicrosandboxError::BootStart {
name: sandbox_name.to_string(),
err: boot_err,
});
}
let synthetic = microsandbox_runtime::boot_error::BootError {
t: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
stage: microsandbox_runtime::boot_error::BootErrorStage::Other,
errno: None,
message: format!(
"sandbox process exited ({status}) before agent relay became available"
),
};
return Err(crate::MicrosandboxError::BootStart {
name: sandbox_name.to_string(),
err: synthetic,
});
}
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff);
}
Ok(Err(e)) => {
tracing::debug!(
attempts,
error = %e,
"wait_for_relay: agent connection failed"
);
if let Some(boot_err) = read_boot_error(log_dir) {
return Err(crate::MicrosandboxError::BootStart {
name: sandbox_name.to_string(),
err: boot_err,
});
}
return Err(e.into());
}
Err(e) => {
tracing::debug!(
attempts,
error = %e,
"wait_for_relay: timed out"
);
if let Some(boot_err) = read_boot_error(log_dir) {
return Err(crate::MicrosandboxError::BootStart {
name: sandbox_name.to_string(),
err: boot_err,
});
}
return Err(crate::MicrosandboxError::Runtime(format!(
"timed out waiting for agent relay: {e}"
)));
}
}
}
}
fn read_boot_error(
log_dir: &std::path::Path,
) -> Option<microsandbox_runtime::boot_error::BootError> {
microsandbox_runtime::boot_error::BootError::read(log_dir)
.ok()
.flatten()
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn build_exec_request(
config: &SandboxConfig,
cmd: String,
args: Vec<String>,
cwd: Option<String>,
user: Option<String>,
env: &[EnvVar],
rlimits: &[Rlimit],
tty: bool,
rows: u16,
cols: u16,
) -> ExecRequest {
let merged = config::merge_env_pairs(&config.spec.env, env);
let mut env: Vec<String> = merged
.iter()
.map(|var| format!("{}={}", var.key, var.value))
.collect();
if tty && !env.iter().any(|e| e.starts_with("TERM=")) {
env.push(format!("TERM={}", default_tty_term()));
}
let rlimits: Vec<ExecRlimit> = rlimits
.iter()
.map(|rl| ExecRlimit {
resource: rl.resource.as_str().to_string(),
soft: rl.soft,
hard: rl.hard,
})
.collect();
ExecRequest {
cmd,
args,
env,
cwd: cwd
.or_else(|| config.spec.runtime.workdir.clone())
.or_else(|| Some("/".to_string())),
user: user.or_else(|| config.spec.runtime.user.clone()),
tty,
rows,
cols,
rlimits,
}
}
fn default_tty_term() -> String {
select_tty_term(std::env::var("TERM").ok().as_deref())
}
fn select_tty_term(term: Option<&str>) -> String {
match term {
Some(term) if !term.trim().is_empty() && term != "dumb" => term.to_string(),
_ => "xterm".to_string(),
}
}
pub(crate) fn terminal_path_for_fd(fd: std::os::fd::RawFd) -> std::io::Result<std::path::PathBuf> {
let mut buf = [0u8; 1024];
let rc = unsafe { libc::ttyname_r(fd, buf.as_mut_ptr().cast(), buf.len()) };
if rc != 0 {
return Err(std::io::Error::from_raw_os_error(rc));
}
let end = buf
.iter()
.position(|&byte| byte == 0)
.ok_or_else(|| std::io::Error::other("ttyname_r did not NUL-terminate"))?;
let path = std::str::from_utf8(&buf[..end]).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"tty path is not valid UTF-8",
)
})?;
Ok(std::path::PathBuf::from(path))
}
pub(crate) fn open_nonblocking_terminal_input(
path: &std::path::Path,
) -> std::io::Result<std::fs::File> {
use std::os::fd::AsRawFd;
let file = std::fs::File::open(path)?;
let fd = file.as_raw_fd();
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if flags == -1 {
return Err(std::io::Error::last_os_error());
}
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(file)
}
pub(crate) fn read_from_fd(fd: std::os::fd::RawFd, buf: &mut [u8]) -> std::io::Result<usize> {
let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
if n < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(n as usize)
}
}
pub(super) async fn update_sandbox_status(
db: &DbWriteConnection,
sandbox_id: i32,
status: SandboxStatus,
) -> MicrosandboxResult<()> {
db.transaction(|txn| async move {
sandbox_entity::Entity::update_many()
.col_expr(sandbox_entity::Column::Status, Expr::value(status))
.col_expr(
sandbox_entity::Column::UpdatedAt,
Expr::value(chrono::Utc::now().naive_utc()),
)
.filter(sandbox_entity::Column::Id.eq(sandbox_id))
.exec(&txn)
.await?;
Ok((txn, ()))
})
.await
}
pub(super) async fn load_sandbox_record_reconciled(
pools: &DbPools,
name: &str,
) -> MicrosandboxResult<sandbox_entity::Model> {
let sandbox = load_sandbox_record(pools.read(), name).await?;
reconcile_sandbox_runtime_state(pools, sandbox).await
}
pub(super) async fn reconcile_sandbox_runtime_state(
pools: &DbPools,
sandbox: sandbox_entity::Model,
) -> MicrosandboxResult<sandbox_entity::Model> {
if !matches!(
sandbox.status,
SandboxStatus::Running | SandboxStatus::Draining
) {
return Ok(sandbox);
}
let run = load_active_run(pools.read(), sandbox.id).await?;
let Some(run) = run else {
return Ok(sandbox);
};
if run.pid.is_some_and(pid_is_alive) {
return Ok(sandbox);
}
mark_sandbox_runtime_stale(pools.write(), sandbox.id, Some(run.id)).await?;
sandbox_entity::Entity::find_by_id(sandbox.id)
.one(pools.read())
.await?
.ok_or_else(|| crate::MicrosandboxError::SandboxNotFound(sandbox.name))
}
pub(super) async fn load_active_run(
db: &DbReadConnection,
sandbox_id: i32,
) -> MicrosandboxResult<Option<run_entity::Model>> {
run_entity::Entity::find()
.filter(run_entity::Column::SandboxId.eq(sandbox_id))
.filter(run_entity::Column::Status.eq(run_entity::RunStatus::Running))
.order_by_desc(run_entity::Column::StartedAt)
.one(db)
.await
.map_err(Into::into)
}
async fn load_active_pids(
db: &DbReadConnection,
sandbox_ids: &[i32],
) -> MicrosandboxResult<HashMap<i32, i32>> {
if sandbox_ids.is_empty() {
return Ok(HashMap::new());
}
let runs = run_entity::Entity::find()
.filter(run_entity::Column::SandboxId.is_in(sandbox_ids.iter().copied()))
.filter(run_entity::Column::Status.eq(run_entity::RunStatus::Running))
.order_by_desc(run_entity::Column::StartedAt)
.all(db)
.await?;
let mut pids = HashMap::with_capacity(sandbox_ids.len());
for run in runs {
if pids.contains_key(&run.sandbox_id) {
continue;
}
if let Some(pid) = pid_from_run(Some(&run)) {
pids.insert(run.sandbox_id, pid);
}
}
Ok(pids)
}
fn pid_from_run(run: Option<&run_entity::Model>) -> Option<i32> {
run.and_then(|model| model.pid)
.filter(|pid| pid_is_alive(*pid))
}
async fn mark_sandbox_runtime_stale(
db: &DbWriteConnection,
sandbox_id: i32,
run_id: Option<i32>,
) -> MicrosandboxResult<()> {
db.transaction(|txn| async move {
let now = chrono::Utc::now().naive_utc();
if let Some(run_id) = run_id {
run_entity::Entity::update_many()
.col_expr(
run_entity::Column::Status,
Expr::value(run_entity::RunStatus::Terminated),
)
.col_expr(
run_entity::Column::TerminationReason,
Expr::value(run_entity::TerminationReason::InternalError),
)
.col_expr(run_entity::Column::TerminatedAt, Expr::value(now))
.filter(run_entity::Column::Id.eq(run_id))
.exec(&txn)
.await?;
}
sandbox_entity::Entity::update_many()
.col_expr(
sandbox_entity::Column::Status,
Expr::value(SandboxStatus::Crashed),
)
.col_expr(sandbox_entity::Column::UpdatedAt, Expr::value(now))
.filter(sandbox_entity::Column::Id.eq(sandbox_id))
.filter(
sandbox_entity::Column::Status
.is_in([SandboxStatus::Running, SandboxStatus::Draining]),
)
.exec(&txn)
.await?;
Ok((txn, ()))
})
.await
}
pub(super) fn pid_is_alive(pid: i32) -> bool {
let result = unsafe { libc::kill(pid, 0) };
if result == 0 {
return true;
}
matches!(
std::io::Error::last_os_error().raw_os_error(),
Some(code) if code == libc::EPERM
)
}
fn pid_is_dead_or_reaped(pid: i32) -> bool {
let mut status = 0;
let result = unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) };
if result == pid {
return true;
}
!pid_is_alive(pid)
}
fn image_pull_policy(policy: PullPolicy) -> microsandbox_image::PullPolicy {
match policy {
PullPolicy::IfMissing => microsandbox_image::PullPolicy::IfMissing,
PullPolicy::Always => microsandbox_image::PullPolicy::Always,
PullPolicy::Never => microsandbox_image::PullPolicy::Never,
}
}
pub(crate) fn hostname_from_sandbox_name(name: &str) -> String {
derive_hostname(name)
}
async fn pull_oci_image(
local_backend: &crate::backend::LocalBackend,
reference: &str,
pull_policy: PullPolicy,
registry_overrides: RegistryOverrides,
progress: Option<PullProgressSender>,
) -> MicrosandboxResult<PullResult> {
let global = local_backend.config();
let cache = GlobalCache::new(&local_backend.cache_dir())?;
let platform = microsandbox_image::Platform::host_linux();
let image_ref: Reference = reference.parse().map_err(|e| {
crate::MicrosandboxError::InvalidConfig(format!("invalid image reference: {e}"))
})?;
let options = PullOptions {
pull_policy: image_pull_policy(pull_policy),
..Default::default()
};
if let Some((result, metadata)) = Registry::pull_cached(&cache, &image_ref, &options)? {
if let Some(sender) = progress {
let reference: std::sync::Arc<str> = reference.to_string().into();
sender.send(PullProgress::Resolving {
reference: reference.clone(),
});
sender.send(PullProgress::Resolved {
reference: reference.clone(),
manifest_digest: metadata.manifest_digest.clone().into(),
layer_count: metadata.layers.len(),
total_download_bytes: metadata
.layers
.iter()
.filter_map(|layer| layer.size_bytes)
.reduce(|a, b| a + b),
});
sender.send(PullProgress::Complete {
reference,
layer_count: metadata.layers.len(),
});
}
return Ok(result);
}
let auth = match registry_overrides.auth {
Some(auth) => auth,
None => global.resolve_registry_auth(image_ref.registry())?,
};
let mut ca_certs = global.resolve_ca_certs().await?;
ca_certs.extend(registry_overrides.ca_certs);
let mut insecure_registries = global.insecure_registries();
if registry_overrides.insecure {
insecure_registries.push(image_ref.registry().to_string());
}
let registry = Registry::builder(platform, cache)
.auth(auth)
.extra_ca_certs(ca_certs)
.add_insecure_registries(insecure_registries)
.build()?;
if let Some(sender) = progress {
let task = registry.pull_with_sender(&image_ref, &options, sender);
let result = task
.await
.map_err(|e| crate::MicrosandboxError::Custom(format!("pull task panicked: {e}")))??;
Ok(result)
} else {
let result = registry.pull(&image_ref, &options).await?;
Ok(result)
}
}
pub(crate) fn validate_labels(labels: &BTreeMap<String, String>) -> MicrosandboxResult<()> {
for key in labels.keys() {
if key.is_empty() {
return Err(crate::MicrosandboxError::InvalidConfig(
"label key must not be empty".into(),
));
}
if let Some(prefix) = reserved_label_prefix(key) {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"label key '{key}' uses reserved prefix '{prefix}'"
)));
}
}
Ok(())
}
pub(crate) fn validate_env(env: &[EnvVar]) -> MicrosandboxResult<()> {
for var in env {
if var.key.starts_with("MSB_") {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"environment variable {:?} uses the reserved MSB_ prefix",
var.key
)));
}
}
Ok(())
}
fn sandbox_handle_matches_filter(handle: &SandboxHandle, filter: &SandboxFilter) -> bool {
let Ok(value) = serde_json::from_str::<serde_json::Value>(handle.config_json()) else {
return false;
};
let labels = value
.get("labels")
.or_else(|| value.get("config").and_then(|config| config.get("labels")))
.and_then(serde_json::Value::as_object);
filter.labels.iter().all(|(key, expected)| {
labels
.and_then(|labels| labels.get(key))
.and_then(serde_json::Value::as_str)
.is_some_and(|actual| actual == expected)
})
}
fn validate_rootfs_source(rootfs: &RootfsSource) -> MicrosandboxResult<()> {
match rootfs {
RootfsSource::Bind(path) => {
if !path.exists() {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"rootfs bind path does not exist: {}",
path.display()
)));
}
if !path.is_dir() {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"rootfs bind path is not a directory: {}",
path.display()
)));
}
}
RootfsSource::Oci(_) => {}
RootfsSource::DiskImage { path, .. } => {
if !path.exists() {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"disk image does not exist: {}",
path.display()
)));
}
if !path.is_file() {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"disk image is not a regular file: {}",
path.display()
)));
}
}
}
Ok(())
}
pub(super) fn remove_dir_if_exists(path: &Path) -> MicrosandboxResult<()> {
match std::fs::remove_dir_all(path) {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err.into()),
}
}
pub(super) async fn load_sandbox_record(
db: &DbReadConnection,
name: &str,
) -> MicrosandboxResult<sandbox_entity::Model> {
sandbox_entity::Entity::find()
.filter(sandbox_entity::Column::Name.eq(name))
.one(db)
.await?
.ok_or_else(|| crate::MicrosandboxError::SandboxNotFound(name.into()))
}
async fn prepare_create_target(
pools: &DbPools,
config: &SandboxConfig,
sandbox_dir: &Path,
) -> MicrosandboxResult<()> {
let existing = sandbox_entity::Entity::find()
.filter(sandbox_entity::Column::Name.eq(&config.spec.name))
.one(pools.read())
.await?;
let dir_exists = sandbox_dir.exists();
if !config.replace_existing {
if existing.is_some() || dir_exists {
return Err(crate::MicrosandboxError::SandboxAlreadyExists(format!(
"sandbox '{}' already exists; remove it, start the stopped sandbox, or recreate with .replace()",
config.spec.name
)));
}
return Ok(());
}
if let Some(model) = existing {
let model = reconcile_sandbox_runtime_state(pools, model).await?;
let active = matches!(
model.status,
SandboxStatus::Running | SandboxStatus::Draining | SandboxStatus::Paused
);
if active {
stop_sandbox_for_replacement(pools, &model, config.replace_with_timeout).await?;
}
sandbox_entity::Entity::delete_by_id(model.id)
.exec(pools.write())
.await?;
}
remove_dir_if_exists(sandbox_dir)?;
Ok(())
}
async fn stop_sandbox_for_replacement(
pools: &DbPools,
sandbox: &sandbox_entity::Model,
grace: std::time::Duration,
) -> MicrosandboxResult<()> {
let run = load_active_run(pools.read(), sandbox.id).await?;
let pids: Vec<i32> = run
.as_ref()
.and_then(|model| model.pid)
.filter(|pid| pid_is_alive(*pid))
.into_iter()
.collect();
if !pids.is_empty() {
if !grace.is_zero() {
for pid in &pids {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(*pid),
nix::sys::signal::Signal::SIGTERM,
);
}
wait_for_pids_to_exit(&pids, grace).await;
}
for pid in pids.iter().copied().filter(|p| pid_is_alive(*p)) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGKILL,
);
}
}
mark_sandbox_stopped_for_replacement(
pools.write(),
sandbox.id,
run.as_ref().map(|model| model.id),
)
.await
}
async fn mark_sandbox_stopped_for_replacement(
db: &DbWriteConnection,
sandbox_id: i32,
run_id: Option<i32>,
) -> MicrosandboxResult<()> {
db.transaction(|txn| async move {
let now = chrono::Utc::now().naive_utc();
if let Some(run_id) = run_id {
run_entity::Entity::update_many()
.col_expr(
run_entity::Column::Status,
Expr::value(run_entity::RunStatus::Terminated),
)
.col_expr(
run_entity::Column::TerminationReason,
Expr::value(run_entity::TerminationReason::Signal),
)
.col_expr(run_entity::Column::TerminatedAt, Expr::value(now))
.filter(run_entity::Column::Id.eq(run_id))
.exec(&txn)
.await?;
}
sandbox_entity::Entity::update_many()
.col_expr(
sandbox_entity::Column::Status,
Expr::value(SandboxStatus::Stopped),
)
.col_expr(sandbox_entity::Column::UpdatedAt, Expr::value(now))
.filter(sandbox_entity::Column::Id.eq(sandbox_id))
.exec(&txn)
.await?;
Ok((txn, ()))
})
.await
}
async fn wait_for_pids_to_exit(pids: &[i32], timeout: std::time::Duration) {
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(50);
loop {
if pids.iter().all(|pid| !pid_is_alive(*pid)) {
return;
}
if start.elapsed() >= timeout {
return;
}
tokio::time::sleep(poll_interval).await;
}
}
fn validate_start_state(
local_backend: &crate::backend::LocalBackend,
config: &SandboxConfig,
sandbox_dir: &Path,
) -> MicrosandboxResult<()> {
if !sandbox_dir.exists() {
return Err(crate::MicrosandboxError::Custom(format!(
"sandbox state missing for '{}': {}",
config.spec.name,
sandbox_dir.display()
)));
}
if let RootfsSource::Oci(_) = &config.spec.image
&& let Some(ref digest_str) = config.manifest_digest
{
let cache_dir = local_backend.cache_dir();
if let Ok(cache) = GlobalCache::new(&cache_dir)
&& let Ok(digest) = digest_str.parse::<Digest>()
{
let vmdk_path = cache.vmdk_path(&digest);
if !vmdk_path.exists() {
return Err(crate::MicrosandboxError::Custom(format!(
"sandbox '{}' cannot start: VMDK missing: {}",
config.spec.name,
vmdk_path.display()
)));
}
}
}
Ok(())
}
async fn insert_sandbox_record(
db: &DbWriteConnection,
config: &SandboxConfig,
) -> MicrosandboxResult<i32> {
let config_json = serde_json::to_string(config)?;
db.transaction(|txn| {
let config_json = config_json.clone();
async move {
let now = chrono::Utc::now().naive_utc();
let model = sandbox_entity::ActiveModel {
name: Set(config.spec.name.clone()),
config: Set(config_json),
status: Set(SandboxStatus::Running),
ephemeral: Set(config.spec.lifecycle.ephemeral),
created_at: Set(Some(now)),
updated_at: Set(Some(now)),
..Default::default()
};
let result = sandbox_entity::Entity::insert(model).exec(&txn).await?;
Ok((txn, result.last_insert_id))
}
})
.await
}
async fn persist_oci_manifest_pin(
db: &DbWriteConnection,
sandbox_id: i32,
manifest_digest: &str,
) -> MicrosandboxResult<()> {
db.transaction(|txn| async move {
replace_oci_manifest_pin(&txn, sandbox_id, manifest_digest).await?;
Ok((txn, ()))
})
.await
}
async fn replace_oci_manifest_pin<C: ConnectionTrait>(
db: &C,
sandbox_id: i32,
manifest_digest: &str,
) -> MicrosandboxResult<()> {
use crate::db::entity::manifest as manifest_entity;
let now = chrono::Utc::now().naive_utc();
let manifest = manifest_entity::Entity::find()
.filter(manifest_entity::Column::Digest.eq(manifest_digest))
.one(db)
.await?;
let manifest_id = manifest.map(|m| m.id);
sandbox_rootfs_entity::Entity::delete_many()
.filter(sandbox_rootfs_entity::Column::SandboxId.eq(sandbox_id))
.exec(db)
.await?;
sandbox_rootfs_entity::Entity::insert(sandbox_rootfs_entity::ActiveModel {
sandbox_id: Set(sandbox_id),
manifest_id: Set(manifest_id),
mode: Set("erofs".to_string()),
upper_fstype: Set(Some("ext4".to_string())),
created_at: Set(Some(now)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
async fn create_upper_ext4(
path: &std::path::Path,
size_mib: u32,
tree: Option<tree::FileTree>,
) -> MicrosandboxResult<()> {
let _ = tokio::fs::remove_file(path).await;
let ext4_options = ext4::Ext4FormatOptions {
size_bytes: u64::from(size_mib) * 1024 * 1024,
..Default::default()
};
let overlay_tree = build_overlay_upper_tree(tree);
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
ext4::format_ext4_with_tree(&path, &ext4_options, overlay_tree)
})
.await
.map_err(|e| crate::MicrosandboxError::Custom(format!("ext4 format task failed: {e}")))?
.map_err(|e| crate::MicrosandboxError::Custom(format!("failed to create upper.ext4: {e}")))?;
Ok(())
}
fn build_overlay_upper_tree(tree: Option<tree::FileTree>) -> tree::FileTree {
use tree::{DirectoryNode, FileTree, InodeMetadata, TreeNode};
let mut overlay_tree = FileTree::new();
let mut upper_dir = DirectoryNode::new(InodeMetadata::default());
let work_dir = DirectoryNode::new(InodeMetadata::default());
if let Some(mut tree) = tree {
upper_dir.entries = std::mem::take(&mut tree.root.entries);
}
overlay_tree
.root
.entries
.insert("upper".into(), TreeNode::Directory(upper_dir));
overlay_tree
.root
.entries
.insert("work".into(), TreeNode::Directory(work_dir));
overlay_tree
}
#[cfg(test)]
mod tests {
use std::{
fs,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
path::PathBuf,
process::Command,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use microsandbox_db::entity::{run as run_entity, sandbox_rootfs as sandbox_rootfs_entity};
use microsandbox_db::pool::DbPools;
use crate::sandbox::OciRootfsSource;
use microsandbox_migration::{Migrator, MigratorTrait};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set};
use tempfile::tempdir;
use super::{
MAX_HOSTNAME_BYTES, MAX_SANDBOX_NAME_BYTES, RootfsSource, SandboxConfig, SandboxStatus,
hostname_from_sandbox_name, insert_sandbox_record, persist_oci_manifest_pin,
prepare_create_target, reconcile_sandbox_runtime_state, remove_dir_if_exists,
validate_hostname, validate_rootfs_source,
};
async fn open_test_pools(db_path: &std::path::Path) -> DbPools {
let pools = DbPools::open(
db_path,
1,
std::time::Duration::from_secs(30),
std::time::Duration::from_secs(5),
)
.await
.unwrap();
Migrator::up(pools.write().inner(), None).await.unwrap();
pools
}
fn test_config(name: impl Into<String>) -> SandboxConfig {
SandboxConfig {
spec: microsandbox_types::SandboxSpec {
name: name.into(),
..Default::default()
},
..Default::default()
}
}
fn test_config_with_rootfs(name: impl Into<String>, image: RootfsSource) -> SandboxConfig {
SandboxConfig {
spec: microsandbox_types::SandboxSpec {
name: name.into(),
image,
..Default::default()
},
..Default::default()
}
}
fn unique_temp_path(suffix: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
std::env::temp_dir().join(format!("microsandbox-rootfs-{suffix}-{nanos}"))
}
fn dead_pid() -> i32 {
let mut pid = 900_000;
while super::pid_is_alive(pid) {
pid += 1;
}
pid
}
#[test]
fn test_default_tty_term_prefers_host_term() {
assert_eq!(super::select_tty_term(Some("wezterm")), "wezterm");
}
#[test]
fn test_default_tty_term_falls_back_from_dumb() {
assert_eq!(super::select_tty_term(Some("dumb")), "xterm");
}
#[test]
fn test_shared_tty_fd_flags_are_shared_across_dups() {
let pty = nix::pty::openpty(None, None).unwrap();
let shared_a = unsafe { OwnedFd::from_raw_fd(libc::dup(pty.slave.as_raw_fd())) };
let shared_b = unsafe { OwnedFd::from_raw_fd(libc::dup(shared_a.as_raw_fd())) };
let flags = unsafe { libc::fcntl(shared_a.as_raw_fd(), libc::F_GETFL) };
assert_ne!(flags, -1);
let ret = unsafe {
libc::fcntl(
shared_a.as_raw_fd(),
libc::F_SETFL,
flags | libc::O_NONBLOCK,
)
};
assert_ne!(ret, -1);
let other_flags = unsafe { libc::fcntl(shared_b.as_raw_fd(), libc::F_GETFL) };
assert_ne!(other_flags, -1);
assert_ne!(
other_flags & libc::O_NONBLOCK,
0,
"dup'd tty fds should share O_NONBLOCK state"
);
}
#[test]
fn test_open_nonblocking_terminal_input_keeps_existing_tty_fds_blocking() {
let pty = nix::pty::openpty(None, None).unwrap();
let shared_a = unsafe { OwnedFd::from_raw_fd(libc::dup(pty.slave.as_raw_fd())) };
let shared_b = unsafe { OwnedFd::from_raw_fd(libc::dup(shared_a.as_raw_fd())) };
let tty_path = super::terminal_path_for_fd(pty.slave.as_raw_fd()).unwrap();
let input = super::open_nonblocking_terminal_input(&tty_path).unwrap();
let input_flags = unsafe { libc::fcntl(input.as_raw_fd(), libc::F_GETFL) };
assert_ne!(input_flags, -1);
assert_ne!(
input_flags & libc::O_NONBLOCK,
0,
"re-opened tty input fd should be non-blocking"
);
let flags_a = unsafe { libc::fcntl(shared_a.as_raw_fd(), libc::F_GETFL) };
let flags_b = unsafe { libc::fcntl(shared_b.as_raw_fd(), libc::F_GETFL) };
assert_ne!(flags_a, -1);
assert_ne!(flags_b, -1);
assert_eq!(
flags_a & libc::O_NONBLOCK,
0,
"existing tty fd should remain blocking"
);
assert_eq!(
flags_b & libc::O_NONBLOCK,
0,
"dup'd tty fd should remain blocking"
);
}
#[test]
fn test_validate_rootfs_source_missing_bind_path() {
let path = unique_temp_path("missing");
let err = validate_rootfs_source(&RootfsSource::Bind(path.clone())).unwrap_err();
assert_eq!(
err.to_string(),
format!(
"invalid config: rootfs bind path does not exist: {}",
path.display()
)
);
}
#[test]
fn test_validate_rootfs_source_bind_path_must_be_directory() {
let path = unique_temp_path("file");
fs::write(&path, b"not a directory").unwrap();
let err = validate_rootfs_source(&RootfsSource::Bind(path.clone())).unwrap_err();
assert_eq!(
err.to_string(),
format!(
"invalid config: rootfs bind path is not a directory: {}",
path.display()
)
);
fs::remove_file(path).unwrap();
}
#[test]
fn test_validate_rootfs_source_existing_bind_directory() {
let path = unique_temp_path("dir");
fs::create_dir(&path).unwrap();
validate_rootfs_source(&RootfsSource::Bind(path.clone())).unwrap();
fs::remove_dir(path).unwrap();
}
#[test]
fn test_hostname_from_sandbox_name_passes_short_names_through() {
let name = "short-name";
assert_eq!(hostname_from_sandbox_name(name), name);
let name = "a".repeat(MAX_HOSTNAME_BYTES);
assert_eq!(hostname_from_sandbox_name(&name), name);
}
#[test]
fn test_hostname_from_sandbox_name_collapses_long_names_to_64_bytes() {
let derived = hostname_from_sandbox_name(&"a".repeat(MAX_HOSTNAME_BYTES + 1));
assert_eq!(derived.len(), MAX_HOSTNAME_BYTES);
let derived = hostname_from_sandbox_name(&"a".repeat(MAX_SANDBOX_NAME_BYTES));
assert_eq!(derived.len(), MAX_HOSTNAME_BYTES);
let bytes = derived.as_bytes();
assert_eq!(bytes[MAX_HOSTNAME_BYTES - 9], b'-');
assert!(
bytes[MAX_HOSTNAME_BYTES - 8..]
.iter()
.all(u8::is_ascii_hexdigit)
);
}
#[test]
fn test_hostname_from_sandbox_name_is_deterministic_and_unique() {
let a = "a".repeat(MAX_SANDBOX_NAME_BYTES);
let mut b = a.clone();
b.pop();
b.push('b');
assert_eq!(
hostname_from_sandbox_name(&a),
hostname_from_sandbox_name(&a)
);
assert_ne!(
hostname_from_sandbox_name(&a),
hostname_from_sandbox_name(&b)
);
}
#[test]
fn test_hostname_from_sandbox_name_respects_utf8_boundaries() {
let name = "é".repeat(64);
assert_eq!(name.len(), 128);
let derived = hostname_from_sandbox_name(&name);
assert!(derived.len() <= MAX_HOSTNAME_BYTES);
assert!(derived.is_char_boundary(derived.len()));
}
#[test]
fn test_validate_hostname_accepts_absent_and_64_byte_hostname() {
validate_hostname(None).unwrap();
validate_hostname(Some(&"y".repeat(MAX_HOSTNAME_BYTES))).unwrap();
}
#[test]
fn test_validate_hostname_rejects_empty_hostname() {
let err = validate_hostname(Some("")).unwrap_err();
assert_eq!(
err.to_string(),
"invalid config: hostname must not be empty"
);
}
#[test]
fn test_validate_hostname_rejects_over_64_byte_hostname() {
let err = validate_hostname(Some(&"y".repeat(MAX_HOSTNAME_BYTES + 1))).unwrap_err();
assert_eq!(
err.to_string(),
"invalid config: hostname is too long: 65 bytes (max 64)"
);
}
#[tokio::test]
async fn test_create_local_rejects_invalid_hostname_before_rootfs_validation() {
let temp = tempdir().unwrap();
let backend = Arc::new(
crate::backend::LocalBackend::builder()
.home(temp.path())
.build()
.await
.unwrap(),
);
let mut config =
test_config_with_rootfs("test", RootfsSource::Bind(unique_temp_path("missing")));
config.spec.runtime.hostname = Some("y".repeat(MAX_HOSTNAME_BYTES + 1));
let err =
match super::create_local(backend, config, crate::runtime::SpawnMode::Attached, None)
.await
{
Ok(_) => panic!("invalid hostname should fail before sandbox creation"),
Err(err) => err,
};
assert_eq!(
err.to_string(),
"invalid config: hostname is too long: 65 bytes (max 64)"
);
}
#[test]
fn test_remove_dir_if_exists_removes_existing_sandbox_tree() {
let temp = tempdir().unwrap();
let sandbox_dir = temp.path().join("sandbox");
fs::create_dir_all(sandbox_dir.join("runtime/scripts")).unwrap();
fs::write(sandbox_dir.join("runtime/scripts/start.sh"), b"echo hi").unwrap();
fs::create_dir_all(sandbox_dir.join("rw")).unwrap();
remove_dir_if_exists(&sandbox_dir).unwrap();
assert!(!sandbox_dir.exists());
}
#[test]
fn test_remove_dir_if_exists_ignores_missing_directory() {
let temp = tempdir().unwrap();
let sandbox_dir = temp.path().join("missing");
remove_dir_if_exists(&sandbox_dir).unwrap();
assert!(!sandbox_dir.exists());
}
#[tokio::test]
async fn test_persist_oci_manifest_pin_upserts_rootfs_record() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let mut config = test_config_with_rootfs(
"pinned",
RootfsSource::Oci(OciRootfsSource {
reference: "docker.io/library/alpine".into(),
upper_size_mib: None,
}),
);
config.manifest_digest = Some("sha256:aaaa".into());
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
persist_oci_manifest_pin(
pools.write(),
sandbox_id,
"sha256:1111111111111111111111111111111111111111111111111111111111111111",
)
.await
.unwrap();
persist_oci_manifest_pin(
pools.write(),
sandbox_id,
"sha256:2222222222222222222222222222222222222222222222222222222222222222",
)
.await
.unwrap();
let pins = sandbox_rootfs_entity::Entity::find()
.all(pools.write())
.await
.unwrap();
assert_eq!(pins.len(), 1);
assert_eq!(pins[0].sandbox_id, sandbox_id);
assert_eq!(pins[0].mode, "erofs");
assert_eq!(pins[0].manifest_id, None);
}
#[tokio::test]
async fn test_persist_oci_manifest_pin_replaces_stale_pin_for_different_digest() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let mut config = test_config_with_rootfs(
"recreated",
RootfsSource::Oci(OciRootfsSource {
reference: "docker.io/library/alpine".into(),
upper_size_mib: None,
}),
);
config.manifest_digest = Some("sha256:aaaa".into());
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
persist_oci_manifest_pin(
pools.write(),
sandbox_id,
"sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await
.unwrap();
persist_oci_manifest_pin(
pools.write(),
sandbox_id,
"sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
)
.await
.unwrap();
let pins = sandbox_rootfs_entity::Entity::find()
.all(pools.write())
.await
.unwrap();
assert_eq!(pins.len(), 1);
assert_eq!(pins[0].sandbox_id, sandbox_id);
assert_eq!(pins[0].mode, "erofs");
assert_eq!(pins[0].manifest_id, None);
}
#[tokio::test]
async fn test_insert_sandbox_record_persists_manifest_digest_in_config_json() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let mut config = test_config_with_rootfs(
"persisted-digest",
RootfsSource::Oci(OciRootfsSource {
reference: "docker.io/library/alpine".into(),
upper_size_mib: None,
}),
);
config.manifest_digest = Some("sha256:abc123".into());
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
let row = super::sandbox_entity::Entity::find_by_id(sandbox_id)
.one(pools.write())
.await
.unwrap()
.unwrap();
let decoded: SandboxConfig = serde_json::from_str(&row.config).unwrap();
assert_eq!(decoded.manifest_digest, config.manifest_digest);
}
#[tokio::test]
async fn test_prepare_create_target_rejects_existing_state_without_force() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let sandbox_dir = temp.path().join("sandboxes").join("existing");
fs::create_dir_all(&sandbox_dir).unwrap();
let config = test_config("existing");
let err = prepare_create_target(&pools, &config, &sandbox_dir)
.await
.unwrap_err();
assert!(err.to_string().contains("already exists"));
}
#[tokio::test]
async fn test_prepare_create_target_force_replaces_stopped_sandbox_state() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let sandbox_dir = temp.path().join("sandboxes").join("replaceable");
fs::create_dir_all(sandbox_dir.join("rw")).unwrap();
let config = test_config("replaceable");
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
super::update_sandbox_status(pools.write(), sandbox_id, super::SandboxStatus::Stopped)
.await
.unwrap();
let mut forced = test_config("replaceable");
forced.replace_existing = true;
prepare_create_target(&pools, &forced, &sandbox_dir)
.await
.unwrap();
assert!(!sandbox_dir.exists());
assert!(
super::sandbox_entity::Entity::find_by_id(sandbox_id)
.one(pools.write())
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_reconcile_sandbox_runtime_state_marks_dead_processes_crashed() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let config = test_config("stale");
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
let dead_run_pid = dead_pid();
let run = run_entity::ActiveModel {
sandbox_id: Set(sandbox_id),
pid: Set(Some(dead_run_pid)),
status: Set(run_entity::RunStatus::Running),
..Default::default()
};
let run_id = run_entity::Entity::insert(run)
.exec(pools.write())
.await
.unwrap()
.last_insert_id;
let sandbox = super::sandbox_entity::Entity::find_by_id(sandbox_id)
.one(pools.write())
.await
.unwrap()
.unwrap();
let reconciled = reconcile_sandbox_runtime_state(&pools, sandbox)
.await
.unwrap();
assert_eq!(reconciled.status, SandboxStatus::Crashed);
let run = run_entity::Entity::find_by_id(run_id)
.one(pools.write())
.await
.unwrap()
.unwrap();
assert_eq!(run.status, run_entity::RunStatus::Terminated);
assert_eq!(
run.termination_reason,
Some(run_entity::TerminationReason::InternalError)
);
assert!(run.terminated_at.is_some());
}
#[tokio::test]
async fn test_prepare_create_target_force_replaces_stale_running_sandbox_state() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let sandbox_dir = temp.path().join("sandboxes").join("stale-running");
fs::create_dir_all(sandbox_dir.join("rw")).unwrap();
let config = test_config("stale-running");
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
let run = run_entity::ActiveModel {
sandbox_id: Set(sandbox_id),
pid: Set(Some(dead_pid())),
status: Set(run_entity::RunStatus::Running),
..Default::default()
};
run_entity::Entity::insert(run)
.exec(pools.write())
.await
.unwrap();
let mut forced = test_config("stale-running");
forced.replace_existing = true;
prepare_create_target(&pools, &forced, &sandbox_dir)
.await
.unwrap();
assert!(!sandbox_dir.exists());
assert!(
super::sandbox_entity::Entity::find_by_id(sandbox_id)
.one(pools.write())
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_prepare_create_target_force_replaces_running_sandbox() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let sandbox_dir = temp.path().join("sandboxes").join("running");
fs::create_dir_all(&sandbox_dir).unwrap();
let config = test_config("running");
let sandbox_id = insert_sandbox_record(pools.write(), &config).await.unwrap();
let child = Command::new("sleep").arg("30").spawn().unwrap();
let live_pid = child.id() as i32;
let waiter = std::thread::spawn(move || {
let mut child = child;
child.wait().unwrap()
});
let run = run_entity::ActiveModel {
sandbox_id: Set(sandbox_id),
pid: Set(Some(live_pid)),
status: Set(run_entity::RunStatus::Running),
..Default::default()
};
run_entity::Entity::insert(run)
.exec(pools.write())
.await
.unwrap();
let mut forced = test_config("running");
forced.replace_existing = true;
prepare_create_target(&pools, &forced, &sandbox_dir)
.await
.unwrap();
waiter.join().unwrap();
assert!(!super::pid_is_alive(live_pid));
assert!(!sandbox_dir.exists());
assert!(
super::sandbox_entity::Entity::find_by_id(sandbox_id)
.one(pools.write())
.await
.unwrap()
.is_none()
);
}
#[test]
fn test_validate_start_state_requires_existing_sandbox_dir() {
let temp = tempdir().unwrap();
let sandbox_dir = temp.path().join("missing");
let config = test_config("missing");
let backend = crate::backend::LocalBackend::lazy();
let err = super::validate_start_state(&backend, &config, &sandbox_dir).unwrap_err();
assert!(err.to_string().contains("sandbox state missing"));
}
#[test]
fn test_validate_start_state_accepts_oci_with_manifest_digest() {
let temp = tempdir().unwrap();
let sandbox_dir = temp.path().join("persisted");
fs::create_dir_all(&sandbox_dir).unwrap();
let mut config = test_config_with_rootfs(
"persisted",
RootfsSource::Oci(OciRootfsSource {
reference: "docker.io/library/alpine".into(),
upper_size_mib: None,
}),
);
config.manifest_digest = Some("sha256:aaaa".into());
let backend = crate::backend::LocalBackend::lazy();
let _ = super::validate_start_state(&backend, &config, &sandbox_dir);
}
#[tokio::test]
async fn test_reap_marks_only_dead_running_and_draining_sandboxes() {
let temp = tempdir().unwrap();
let db_path = temp.path().join("test.db");
let pools = open_test_pools(&db_path).await;
let dead = dead_pid();
let cfg_a = test_config("running-dead");
let id_a = insert_sandbox_record(pools.write(), &cfg_a).await.unwrap();
run_entity::Entity::insert(run_entity::ActiveModel {
sandbox_id: Set(id_a),
pid: Set(Some(dead)),
status: Set(run_entity::RunStatus::Running),
..Default::default()
})
.exec(pools.write())
.await
.unwrap();
let child = Command::new("sleep").arg("30").spawn().unwrap();
let live_pid = child.id() as i32;
let waiter = std::thread::spawn(move || {
let mut child = child;
child.wait().unwrap()
});
let cfg_b = test_config("running-alive");
let id_b = insert_sandbox_record(pools.write(), &cfg_b).await.unwrap();
run_entity::Entity::insert(run_entity::ActiveModel {
sandbox_id: Set(id_b),
pid: Set(Some(live_pid)),
status: Set(run_entity::RunStatus::Running),
..Default::default()
})
.exec(pools.write())
.await
.unwrap();
let cfg_c = test_config("draining-dead");
let id_c = insert_sandbox_record(pools.write(), &cfg_c).await.unwrap();
super::update_sandbox_status(pools.write(), id_c, SandboxStatus::Draining)
.await
.unwrap();
run_entity::Entity::insert(run_entity::ActiveModel {
sandbox_id: Set(id_c),
pid: Set(Some(dead)),
status: Set(run_entity::RunStatus::Running),
..Default::default()
})
.exec(pools.write())
.await
.unwrap();
let cfg_d = test_config("stopped");
let id_d = insert_sandbox_record(pools.write(), &cfg_d).await.unwrap();
super::update_sandbox_status(pools.write(), id_d, SandboxStatus::Stopped)
.await
.unwrap();
let cfg_e = test_config("starting");
let id_e = insert_sandbox_record(pools.write(), &cfg_e).await.unwrap();
let stale = super::sandbox_entity::Entity::find()
.filter(
super::sandbox_entity::Column::Status
.is_in([SandboxStatus::Running, SandboxStatus::Draining]),
)
.all(pools.write())
.await
.unwrap();
for sandbox in stale {
let _ = reconcile_sandbox_runtime_state(&pools, sandbox).await;
}
let load = |id| {
let read_db = pools.read();
async move {
super::sandbox_entity::Entity::find_by_id(id)
.one(read_db)
.await
.unwrap()
.unwrap()
}
};
assert_eq!(load(id_a).await.status, SandboxStatus::Crashed);
assert_eq!(load(id_b).await.status, SandboxStatus::Running);
assert_eq!(load(id_c).await.status, SandboxStatus::Crashed);
assert_eq!(load(id_d).await.status, SandboxStatus::Stopped);
assert_eq!(load(id_e).await.status, SandboxStatus::Running);
unsafe { libc::kill(live_pid, libc::SIGKILL) };
waiter.join().unwrap();
}
}