use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, JoinType, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, RelationTrait, Set, TransactionTrait, sea_query::OnConflict,
};
use microsandbox_image::{
CachedImageMetadata, CachedLayerMetadata, Digest, GlobalCache, ImageConfig, Platform, Reference,
};
use crate::{
MicrosandboxError, MicrosandboxResult,
db::entity::{
config as config_entity, image_ref as image_ref_entity, layer as layer_entity,
manifest as manifest_entity, manifest_layer as manifest_layer_entity,
sandbox_rootfs as sandbox_rootfs_entity,
},
};
pub struct Image;
#[derive(Debug)]
pub struct ImageHandle {
#[allow(dead_code)]
db_id: i32,
reference: String,
manifest_digest: Option<String>,
architecture: Option<String>,
os: Option<String>,
layer_count: usize,
total_size_bytes: Option<i64>,
created_at: Option<chrono::DateTime<chrono::Utc>>,
updated_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug)]
pub struct ImageDetail {
pub handle: ImageHandle,
pub config: Option<ImageConfigDetail>,
pub layers: Vec<ImageLayerDetail>,
}
#[derive(Debug)]
pub struct ImageConfigDetail {
pub digest: String,
pub env: Vec<String>,
pub cmd: Option<Vec<String>>,
pub entrypoint: Option<Vec<String>>,
pub working_dir: Option<String>,
pub user: Option<String>,
pub labels: Option<serde_json::Value>,
pub stop_signal: Option<String>,
}
#[derive(Debug)]
pub struct ImageLayerDetail {
pub diff_id: String,
pub blob_digest: String,
pub media_type: Option<String>,
pub compressed_size_bytes: Option<i64>,
pub erofs_size_bytes: Option<i64>,
pub position: i32,
}
impl ImageHandle {
pub fn reference(&self) -> &str {
&self.reference
}
pub fn size_bytes(&self) -> Option<i64> {
self.total_size_bytes
}
pub fn manifest_digest(&self) -> Option<&str> {
self.manifest_digest.as_deref()
}
pub fn architecture(&self) -> Option<&str> {
self.architecture.as_deref()
}
pub fn os(&self) -> Option<&str> {
self.os.as_deref()
}
pub fn layer_count(&self) -> usize {
self.layer_count
}
pub fn last_used_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.updated_at
}
pub fn created_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.created_at
}
}
impl Image {
pub async fn persist(
reference: &str,
metadata: CachedImageMetadata,
) -> MicrosandboxResult<i32> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let reference = reference.to_string();
db.transaction::<_, i32, MicrosandboxError>(|txn| {
Box::pin(async move {
let total_size: i64 = metadata
.layers
.iter()
.filter_map(|l| l.size_bytes)
.map(|s| i64::try_from(s).unwrap_or(i64::MAX))
.fold(0i64, |acc, s| acc.saturating_add(s));
let platform = Platform::host_linux();
let manifest_id = upsert_manifest_record(
txn,
&metadata.manifest_digest,
&metadata.config_digest,
&platform,
metadata.layers.len() as i32,
total_size,
)
.await?;
upsert_config_record(txn, manifest_id, &metadata.config_digest, &metadata.config)
.await?;
manifest_layer_entity::Entity::delete_many()
.filter(manifest_layer_entity::Column::ManifestId.eq(manifest_id))
.exec(txn)
.await?;
let mut manifest_layers = Vec::with_capacity(metadata.layers.len());
for (position, layer_meta) in metadata.layers.iter().enumerate() {
let layer_id = upsert_layer_record(txn, layer_meta).await?;
manifest_layers.push(manifest_layer_entity::ActiveModel {
manifest_id: Set(manifest_id),
layer_id: Set(layer_id),
position: Set(position as i32),
..Default::default()
});
}
if !manifest_layers.is_empty() {
manifest_layer_entity::Entity::insert_many(manifest_layers)
.exec(txn)
.await?;
}
let image_ref_id = upsert_image_ref_record(txn, &reference, manifest_id).await?;
Ok(image_ref_id)
})
})
.await
.map_err(|err| match err {
sea_orm::TransactionError::Connection(db_err) => db_err.into(),
sea_orm::TransactionError::Transaction(err) => err,
})
}
pub async fn get(reference: &str) -> MicrosandboxResult<ImageHandle> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let (image_ref_model, manifest) = image_ref_entity::Entity::find()
.filter(image_ref_entity::Column::Reference.eq(reference))
.find_also_related(manifest_entity::Entity)
.one(db)
.await?
.ok_or_else(|| MicrosandboxError::ImageNotFound(reference.into()))?;
Ok(build_handle_from_parts(
&image_ref_model,
manifest.as_ref(),
None,
))
}
pub async fn list() -> MicrosandboxResult<Vec<ImageHandle>> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let models = image_ref_entity::Entity::find()
.order_by_desc(image_ref_entity::Column::CreatedAt)
.find_also_related(manifest_entity::Entity)
.all(db)
.await?;
let mut handles = Vec::with_capacity(models.len());
for (model, manifest) in models {
handles.push(build_handle_from_parts(&model, manifest.as_ref(), None));
}
Ok(handles)
}
pub async fn inspect(reference: &str) -> MicrosandboxResult<ImageDetail> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let image_ref_model = image_ref_entity::Entity::find()
.filter(image_ref_entity::Column::Reference.eq(reference))
.one(db)
.await?
.ok_or_else(|| MicrosandboxError::ImageNotFound(reference.into()))?;
let manifest = manifest_entity::Entity::find_by_id(image_ref_model.manifest_id)
.one(db)
.await?;
let (config_detail, layers) = if let Some(ref manifest) = manifest {
let config = config_entity::Entity::find()
.filter(config_entity::Column::ManifestId.eq(manifest.id))
.one(db)
.await?;
let config_detail = config.map(|c| {
let parse_vec = |field: &str, raw: Option<String>| -> Vec<String> {
raw.and_then(|s| {
serde_json::from_str::<Vec<String>>(&s)
.map_err(|e| {
tracing::warn!("failed to parse config {field}: {e}");
e
})
.ok()
})
.unwrap_or_default()
};
let parse_opt_vec = |field: &str, raw: Option<String>| -> Option<Vec<String>> {
raw.and_then(|s| {
serde_json::from_str::<Vec<String>>(&s)
.map_err(|e| {
tracing::warn!("failed to parse config {field}: {e}");
e
})
.ok()
})
};
ImageConfigDetail {
digest: c.digest,
env: parse_vec("env", c.env),
cmd: parse_opt_vec("cmd", c.cmd),
entrypoint: parse_opt_vec("entrypoint", c.entrypoint),
working_dir: c.working_dir,
user: c.user,
labels: c.labels.and_then(|s| serde_json::from_str(&s).ok()),
stop_signal: c.stop_signal,
}
});
let ml_rows = manifest_layer_entity::Entity::find()
.filter(manifest_layer_entity::Column::ManifestId.eq(manifest.id))
.order_by_asc(manifest_layer_entity::Column::Position)
.find_also_related(layer_entity::Entity)
.all(db)
.await?;
let mut layers = Vec::with_capacity(ml_rows.len());
for (ml, layer) in ml_rows {
if let Some(layer) = layer {
layers.push(ImageLayerDetail {
diff_id: layer.diff_id,
blob_digest: layer.blob_digest,
media_type: layer.media_type,
compressed_size_bytes: layer.compressed_size_bytes,
erofs_size_bytes: layer.erofs_size_bytes,
position: ml.position,
});
}
}
(config_detail, layers)
} else {
(None, Vec::new())
};
let handle =
build_handle_from_parts(&image_ref_model, manifest.as_ref(), Some(layers.len()));
Ok(ImageDetail {
handle,
config: config_detail,
layers,
})
}
pub async fn remove(reference: &str, force: bool) -> MicrosandboxResult<()> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let image_ref_model = image_ref_entity::Entity::find()
.filter(image_ref_entity::Column::Reference.eq(reference))
.one(db)
.await?
.ok_or_else(|| MicrosandboxError::ImageNotFound(reference.into()))?;
let manifest_id = image_ref_model.manifest_id;
let image_ref_id = image_ref_model.id;
let (layer_diff_ids, flat_manifest_digest) = db
.transaction::<_, (Vec<String>, Option<String>), MicrosandboxError>(|txn| {
Box::pin(async move {
if !force {
let refs = sandbox_rootfs_entity::Entity::find()
.filter(sandbox_rootfs_entity::Column::ManifestId.eq(manifest_id))
.all(txn)
.await?;
if !refs.is_empty() {
let sandbox_ids: Vec<String> =
refs.iter().map(|r| r.sandbox_id.to_string()).collect();
return Err(MicrosandboxError::ImageInUse(sandbox_ids.join(", ")));
}
}
let manifest_digest = manifest_entity::Entity::find_by_id(manifest_id)
.one(txn)
.await?
.map(|manifest| manifest.digest);
let layer_diff_ids: Vec<String> = layer_entity::Entity::find()
.join(
JoinType::InnerJoin,
layer_entity::Relation::ManifestLayer.def(),
)
.filter(manifest_layer_entity::Column::ManifestId.eq(manifest_id))
.all(txn)
.await?
.into_iter()
.map(|l| l.diff_id)
.collect();
image_ref_entity::Entity::delete_by_id(image_ref_id)
.exec(txn)
.await?;
let remaining_refs = image_ref_entity::Entity::find()
.filter(image_ref_entity::Column::ManifestId.eq(manifest_id))
.count(txn)
.await?;
if remaining_refs == 0 {
manifest_entity::Entity::delete_by_id(manifest_id)
.exec(txn)
.await?;
let mut orphaned = Vec::new();
for diff_id in &layer_diff_ids {
let refs = manifest_layer_entity::Entity::find()
.join(
JoinType::InnerJoin,
manifest_layer_entity::Relation::Layer.def(),
)
.filter(layer_entity::Column::DiffId.eq(diff_id.as_str()))
.count(txn)
.await?;
if refs == 0 {
layer_entity::Entity::delete_many()
.filter(layer_entity::Column::DiffId.eq(diff_id.as_str()))
.exec(txn)
.await?;
orphaned.push(diff_id.clone());
}
}
return Ok((orphaned, manifest_digest));
}
Ok((Vec::new(), None))
})
})
.await
.map_err(|err| match err {
sea_orm::TransactionError::Connection(db_err) => db_err.into(),
sea_orm::TransactionError::Transaction(err) => err,
})?;
let cache_dir = crate::config::config().cache_dir();
if let Ok(cache) = GlobalCache::new(&cache_dir) {
for diff_id_str in &layer_diff_ids {
if let Ok(diff_id) = diff_id_str.parse::<Digest>() {
let _ = tokio::fs::remove_file(cache.layer_erofs_path(&diff_id)).await;
let _ = tokio::fs::remove_file(cache.layer_erofs_lock_path(&diff_id)).await;
}
}
if let Some(manifest_digest) = flat_manifest_digest
&& let Ok(digest) = manifest_digest.parse::<Digest>()
{
let _ = tokio::fs::remove_file(cache.fsmeta_erofs_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.fsmeta_erofs_lock_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.vmdk_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.vmdk_lock_path(&digest)).await;
}
if let Ok(image_ref) = reference.parse::<Reference>() {
let _ = cache.delete_image_metadata(&image_ref);
}
}
Ok(())
}
pub async fn gc_layers() -> MicrosandboxResult<u32> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let orphans: Vec<layer_entity::Model> = layer_entity::Entity::find()
.left_join(manifest_layer_entity::Entity)
.filter(manifest_layer_entity::Column::Id.is_null())
.all(db)
.await?;
let cache_dir = crate::config::config().cache_dir();
let cache = GlobalCache::new(&cache_dir).ok();
let mut removed = 0u32;
for orphan in &orphans {
layer_entity::Entity::delete_by_id(orphan.id)
.exec(db)
.await?;
if let Some(ref cache) = cache
&& let Ok(diff_id) = orphan.diff_id.parse::<Digest>()
{
let _ = tokio::fs::remove_file(cache.layer_erofs_path(&diff_id)).await;
let _ = tokio::fs::remove_file(cache.layer_erofs_lock_path(&diff_id)).await;
}
removed += 1;
}
Ok(removed)
}
pub async fn gc() -> MicrosandboxResult<u32> {
Self::gc_layers().await
}
}
fn build_handle_from_parts(
model: &image_ref_entity::Model,
manifest: Option<&manifest_entity::Model>,
layer_count: Option<usize>,
) -> ImageHandle {
ImageHandle {
db_id: model.id,
reference: model.reference.clone(),
manifest_digest: manifest.map(|m| m.digest.clone()),
architecture: manifest.and_then(|m| m.architecture.clone()),
os: manifest.and_then(|m| m.os.clone()),
layer_count: layer_count
.or_else(|| {
manifest.and_then(|m| usize::try_from(m.layer_count.unwrap_or_default()).ok())
})
.unwrap_or_default(),
total_size_bytes: manifest.and_then(|m| m.total_size_bytes),
created_at: model.created_at.map(|dt| dt.and_utc()),
updated_at: model.updated_at.map(|dt| dt.and_utc()),
}
}
pub(crate) async fn upsert_image_ref_record<C: ConnectionTrait>(
db: &C,
reference: &str,
manifest_id: i32,
) -> MicrosandboxResult<i32> {
let now = chrono::Utc::now().naive_utc();
image_ref_entity::Entity::insert(image_ref_entity::ActiveModel {
reference: Set(reference.to_string()),
manifest_id: Set(manifest_id),
created_at: Set(Some(now)),
updated_at: Set(Some(now)),
..Default::default()
})
.on_conflict(
OnConflict::column(image_ref_entity::Column::Reference)
.update_columns([
image_ref_entity::Column::ManifestId,
image_ref_entity::Column::UpdatedAt,
])
.to_owned(),
)
.exec(db)
.await?;
image_ref_entity::Entity::find()
.filter(image_ref_entity::Column::Reference.eq(reference))
.one(db)
.await?
.map(|model| model.id)
.ok_or_else(|| {
crate::MicrosandboxError::Custom(format!(
"image_ref '{}' missing after upsert",
reference
))
})
}
async fn upsert_manifest_record<C: ConnectionTrait>(
db: &C,
digest: &str,
config_digest: &str,
platform: &Platform,
layer_count: i32,
total_size_bytes: i64,
) -> MicrosandboxResult<i32> {
let now = chrono::Utc::now().naive_utc();
manifest_entity::Entity::insert(manifest_entity::ActiveModel {
digest: Set(digest.to_string()),
config_digest: Set(Some(config_digest.to_string())),
architecture: Set(Some(platform.arch.to_string())),
os: Set(Some(platform.os.to_string())),
variant: Set(None),
layer_count: Set(Some(layer_count)),
total_size_bytes: Set(Some(total_size_bytes)),
created_at: Set(Some(now)),
..Default::default()
})
.on_conflict(
OnConflict::column(manifest_entity::Column::Digest)
.do_nothing()
.to_owned(),
)
.exec(db)
.await
.ok();
manifest_entity::Entity::find()
.filter(manifest_entity::Column::Digest.eq(digest))
.one(db)
.await?
.map(|model| model.id)
.ok_or_else(|| {
crate::MicrosandboxError::Custom(format!("manifest '{}' missing after upsert", digest))
})
}
async fn upsert_config_record<C: ConnectionTrait>(
db: &C,
manifest_id: i32,
digest: &str,
config: &ImageConfig,
) -> MicrosandboxResult<()> {
let env_json = if config.env.is_empty() {
None
} else {
Some(serde_json::to_string(&config.env)?)
};
let cmd_json = config.cmd.as_ref().map(serde_json::to_string).transpose()?;
let entrypoint_json = config
.entrypoint
.as_ref()
.map(serde_json::to_string)
.transpose()?;
let now = chrono::Utc::now().naive_utc();
config_entity::Entity::delete_many()
.filter(config_entity::Column::ManifestId.eq(manifest_id))
.exec(db)
.await?;
config_entity::Entity::insert(config_entity::ActiveModel {
manifest_id: Set(manifest_id),
digest: Set(digest.to_string()),
env: Set(env_json),
cmd: Set(cmd_json),
entrypoint: Set(entrypoint_json),
working_dir: Set(config.working_dir.clone()),
user: Set(config.user.clone()),
labels: Set(None),
stop_signal: Set(None),
created_at: Set(Some(now)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
async fn upsert_layer_record<C: ConnectionTrait>(
db: &C,
layer_meta: &CachedLayerMetadata,
) -> MicrosandboxResult<i32> {
let now = chrono::Utc::now().naive_utc();
layer_entity::Entity::insert(layer_entity::ActiveModel {
diff_id: Set(layer_meta.diff_id.clone()),
blob_digest: Set(layer_meta.digest.clone()),
media_type: Set(layer_meta.media_type.clone()),
compressed_size_bytes: Set(layer_meta
.size_bytes
.map(|s| i64::try_from(s).unwrap_or(i64::MAX))),
erofs_size_bytes: Set(None),
created_at: Set(Some(now)),
last_used_at: Set(Some(now)),
..Default::default()
})
.on_conflict(
OnConflict::column(layer_entity::Column::DiffId)
.update_column(layer_entity::Column::LastUsedAt)
.to_owned(),
)
.exec(db)
.await
.ok();
layer_entity::Entity::find()
.filter(layer_entity::Column::DiffId.eq(&layer_meta.diff_id))
.one(db)
.await?
.map(|model| model.id)
.ok_or_else(|| {
crate::MicrosandboxError::Custom(format!(
"layer '{}' missing after upsert",
layer_meta.diff_id
))
})
}