use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, JoinType, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, RelationTrait, Set, TransactionTrait, sea_query::OnConflict,
};
use crate::{
MicrosandboxError, MicrosandboxResult,
db::entity::{
config as config_entity, image as image_entity, layer as layer_entity,
manifest as manifest_entity, manifest_layer as manifest_layer_entity,
sandbox_image as sandbox_image_entity,
},
};
pub struct Image;
#[derive(Debug)]
pub struct ImageHandle {
#[allow(dead_code)]
db_id: i32,
reference: String,
size_bytes: Option<i64>,
manifest_digest: Option<String>,
architecture: Option<String>,
os: Option<String>,
layer_count: usize,
last_used_at: Option<chrono::DateTime<chrono::Utc>>,
created_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug)]
pub struct ImageDetail {
pub handle: ImageHandle,
pub config: Option<ImageConfigDetail>,
pub layers: Vec<ImageLayerDetail>,
}
#[derive(Debug)]
pub struct ImageConfigDetail {
pub digest: String,
pub architecture: Option<String>,
pub os: Option<String>,
pub env: Vec<String>,
pub cmd: Option<Vec<String>>,
pub entrypoint: Option<Vec<String>>,
pub working_dir: Option<String>,
pub user: Option<String>,
pub exposed_ports: Vec<String>,
pub volumes: Vec<String>,
}
#[derive(Debug)]
pub struct ImageLayerDetail {
pub digest: String,
pub diff_id: String,
pub media_type: Option<String>,
pub size_bytes: Option<i64>,
pub position: i32,
}
impl ImageHandle {
pub fn reference(&self) -> &str {
&self.reference
}
pub fn size_bytes(&self) -> Option<i64> {
self.size_bytes
}
pub fn manifest_digest(&self) -> Option<&str> {
self.manifest_digest.as_deref()
}
pub fn architecture(&self) -> Option<&str> {
self.architecture.as_deref()
}
pub fn os(&self) -> Option<&str> {
self.os.as_deref()
}
pub fn layer_count(&self) -> usize {
self.layer_count
}
pub fn last_used_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.last_used_at
}
pub fn created_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.created_at
}
}
impl Image {
pub async fn persist(
reference: &str,
metadata: microsandbox_image::CachedImageMetadata,
) -> MicrosandboxResult<i32> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let reference = reference.to_string();
db.transaction::<_, i32, MicrosandboxError>(|txn| {
Box::pin(async move {
let total_size: i64 = metadata
.layers
.iter()
.filter_map(|l| l.size_bytes)
.map(|s| i64::try_from(s).unwrap_or(i64::MAX))
.fold(0i64, |acc, s| acc.saturating_add(s));
let image_id = upsert_image_record(txn, &reference, Some(total_size)).await?;
let manifest_id =
upsert_manifest_record(txn, image_id, &metadata.manifest_digest).await?;
let platform = microsandbox_image::Platform::host_linux();
upsert_config_record(
txn,
manifest_id,
&metadata.config_digest,
&metadata.config,
&platform,
)
.await?;
manifest_layer_entity::Entity::delete_many()
.filter(manifest_layer_entity::Column::ManifestId.eq(manifest_id))
.exec(txn)
.await?;
for (position, layer_meta) in metadata.layers.iter().enumerate() {
let layer_id = upsert_layer_record(txn, layer_meta).await?;
manifest_layer_entity::Entity::insert(manifest_layer_entity::ActiveModel {
manifest_id: Set(manifest_id),
layer_id: Set(layer_id),
position: Set(position as i32),
..Default::default()
})
.exec(txn)
.await?;
}
Ok(image_id)
})
})
.await
.map_err(|err| match err {
sea_orm::TransactionError::Connection(db_err) => db_err.into(),
sea_orm::TransactionError::Transaction(err) => err,
})
}
pub async fn get(reference: &str) -> MicrosandboxResult<ImageHandle> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let image_model = image_entity::Entity::find()
.filter(image_entity::Column::Reference.eq(reference))
.one(db)
.await?
.ok_or_else(|| MicrosandboxError::ImageNotFound(reference.into()))?;
build_handle(db, image_model).await
}
pub async fn list() -> MicrosandboxResult<Vec<ImageHandle>> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let image_models = image_entity::Entity::find()
.order_by_desc(image_entity::Column::CreatedAt)
.all(db)
.await?;
let mut handles = Vec::with_capacity(image_models.len());
for model in image_models {
handles.push(build_handle(db, model).await?);
}
Ok(handles)
}
pub async fn inspect(reference: &str) -> MicrosandboxResult<ImageDetail> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let image_model = image_entity::Entity::find()
.filter(image_entity::Column::Reference.eq(reference))
.one(db)
.await?
.ok_or_else(|| MicrosandboxError::ImageNotFound(reference.into()))?;
let manifest = manifest_entity::Entity::find()
.filter(manifest_entity::Column::ImageId.eq(image_model.id))
.one(db)
.await?;
let (config_detail, layers) = if let Some(ref manifest) = manifest {
let config = config_entity::Entity::find()
.filter(config_entity::Column::ManifestId.eq(manifest.id))
.one(db)
.await?;
let config_detail = config.map(|c| {
let parse_vec = |field: &str, raw: Option<String>| -> Vec<String> {
raw.and_then(|s| {
serde_json::from_str::<Vec<String>>(&s)
.map_err(|e| {
tracing::warn!("failed to parse config {field}: {e}");
e
})
.ok()
})
.unwrap_or_default()
};
let parse_opt_vec = |field: &str, raw: Option<String>| -> Option<Vec<String>> {
raw.and_then(|s| {
serde_json::from_str::<Vec<String>>(&s)
.map_err(|e| {
tracing::warn!("failed to parse config {field}: {e}");
e
})
.ok()
})
};
ImageConfigDetail {
digest: c.digest,
architecture: c.architecture,
os: c.os,
env: parse_vec("env", c.env),
cmd: parse_opt_vec("cmd", c.cmd),
entrypoint: parse_opt_vec("entrypoint", c.entrypoint),
working_dir: c.working_dir,
user: c.user,
exposed_ports: parse_vec("exposed_ports", c.exposed_ports),
volumes: parse_vec("volumes", c.volumes),
}
});
let ml_rows = manifest_layer_entity::Entity::find()
.filter(manifest_layer_entity::Column::ManifestId.eq(manifest.id))
.order_by_asc(manifest_layer_entity::Column::Position)
.all(db)
.await?;
let mut layers = Vec::with_capacity(ml_rows.len());
for ml in ml_rows {
if let Some(layer) = layer_entity::Entity::find_by_id(ml.layer_id)
.one(db)
.await?
{
layers.push(ImageLayerDetail {
digest: layer.digest,
diff_id: layer.diff_id,
media_type: layer.media_type,
size_bytes: layer.size_bytes,
position: ml.position,
});
}
}
(config_detail, layers)
} else {
(None, Vec::new())
};
let handle = build_handle_from_parts(
&image_model,
manifest.as_ref().map(|m| m.digest.as_str()),
config_detail
.as_ref()
.and_then(|c| c.architecture.as_deref()),
config_detail.as_ref().and_then(|c| c.os.as_deref()),
layers.len(),
);
Ok(ImageDetail {
handle,
config: config_detail,
layers,
})
}
pub async fn remove(reference: &str, force: bool) -> MicrosandboxResult<()> {
let db =
crate::db::init_global(Some(crate::config::config().database.max_connections)).await?;
let image_model = image_entity::Entity::find()
.filter(image_entity::Column::Reference.eq(reference))
.one(db)
.await?
.ok_or_else(|| MicrosandboxError::ImageNotFound(reference.into()))?;
let image_id = image_model.id;
let layer_digests = db
.transaction::<_, Vec<String>, MicrosandboxError>(|txn| {
Box::pin(async move {
if !force {
let refs = sandbox_image_entity::Entity::find()
.filter(sandbox_image_entity::Column::ImageId.eq(image_id))
.all(txn)
.await?;
if !refs.is_empty() {
let sandbox_ids: Vec<String> =
refs.iter().map(|r| r.sandbox_id.to_string()).collect();
return Err(MicrosandboxError::ImageInUse(sandbox_ids.join(", ")));
}
}
let layer_digests: Vec<String> = layer_entity::Entity::find()
.join(
JoinType::InnerJoin,
layer_entity::Relation::ManifestLayer.def(),
)
.join(
JoinType::InnerJoin,
manifest_layer_entity::Relation::Manifest.def(),
)
.filter(manifest_entity::Column::ImageId.eq(image_id))
.all(txn)
.await?
.into_iter()
.map(|l| l.digest)
.collect();
if force {
sandbox_image_entity::Entity::delete_many()
.filter(sandbox_image_entity::Column::ImageId.eq(image_id))
.exec(txn)
.await?;
}
image_entity::Entity::delete_by_id(image_id)
.exec(txn)
.await?;
let mut orphaned_digests = Vec::new();
for digest_str in &layer_digests {
let refs = manifest_layer_entity::Entity::find()
.join(
JoinType::InnerJoin,
manifest_layer_entity::Relation::Layer.def(),
)
.filter(layer_entity::Column::Digest.eq(digest_str.as_str()))
.count(txn)
.await?;
if refs == 0 {
layer_entity::Entity::delete_many()
.filter(layer_entity::Column::Digest.eq(digest_str.as_str()))
.exec(txn)
.await?;
orphaned_digests.push(digest_str.clone());
}
}
Ok(orphaned_digests)
})
})
.await
.map_err(|err| match err {
sea_orm::TransactionError::Connection(db_err) => db_err.into(),
sea_orm::TransactionError::Transaction(err) => err,
})?;
let cache_dir = crate::config::config().cache_dir();
if let Ok(cache) = microsandbox_image::GlobalCache::new(&cache_dir) {
for digest_str in &layer_digests {
if let Ok(digest) = digest_str.parse::<microsandbox_image::Digest>() {
let _ = tokio::fs::remove_dir_all(cache.extracted_dir(&digest)).await;
let _ = tokio::fs::remove_file(cache.tar_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.index_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.lock_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.download_lock_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.part_path(&digest)).await;
let _ = tokio::fs::remove_file(cache.implicit_dirs_path(&digest)).await;
let _ = tokio::fs::remove_dir_all(cache.extracting_dir(&digest)).await;
}
}
if let Ok(image_ref) = reference.parse::<microsandbox_image::Reference>() {
let _ = cache.delete_image_metadata(&image_ref);
}
}
Ok(())
}
}
async fn build_handle<C: ConnectionTrait>(
db: &C,
model: image_entity::Model,
) -> MicrosandboxResult<ImageHandle> {
let manifest = manifest_entity::Entity::find()
.filter(manifest_entity::Column::ImageId.eq(model.id))
.one(db)
.await?;
let (digest, arch, os, layer_count) = if let Some(ref manifest) = manifest {
let config = config_entity::Entity::find()
.filter(config_entity::Column::ManifestId.eq(manifest.id))
.one(db)
.await?;
let count = manifest_layer_entity::Entity::find()
.filter(manifest_layer_entity::Column::ManifestId.eq(manifest.id))
.count(db)
.await? as usize;
(
Some(manifest.digest.clone()),
config.as_ref().and_then(|c| c.architecture.clone()),
config.as_ref().and_then(|c| c.os.clone()),
count,
)
} else {
(None, None, None, 0)
};
Ok(build_handle_from_parts(
&model,
digest.as_deref(),
arch.as_deref(),
os.as_deref(),
layer_count,
))
}
fn build_handle_from_parts(
model: &image_entity::Model,
manifest_digest: Option<&str>,
architecture: Option<&str>,
os: Option<&str>,
layer_count: usize,
) -> ImageHandle {
ImageHandle {
db_id: model.id,
reference: model.reference.clone(),
size_bytes: model.size_bytes,
manifest_digest: manifest_digest.map(|s| s.to_string()),
architecture: architecture.map(|s| s.to_string()),
os: os.map(|s| s.to_string()),
layer_count,
last_used_at: model.last_used_at.map(|dt| dt.and_utc()),
created_at: model.created_at.map(|dt| dt.and_utc()),
}
}
pub(crate) async fn upsert_image_record<C: ConnectionTrait>(
db: &C,
reference: &str,
size_bytes: Option<i64>,
) -> MicrosandboxResult<i32> {
let now = chrono::Utc::now().naive_utc();
let mut update_columns = vec![image_entity::Column::LastUsedAt];
if size_bytes.is_some() {
update_columns.push(image_entity::Column::SizeBytes);
}
image_entity::Entity::insert(image_entity::ActiveModel {
reference: Set(reference.to_string()),
size_bytes: Set(size_bytes),
last_used_at: Set(Some(now)),
created_at: Set(Some(now)),
..Default::default()
})
.on_conflict(
OnConflict::column(image_entity::Column::Reference)
.update_columns(update_columns)
.to_owned(),
)
.exec(db)
.await?;
image_entity::Entity::find()
.filter(image_entity::Column::Reference.eq(reference))
.one(db)
.await?
.map(|model| model.id)
.ok_or_else(|| {
crate::MicrosandboxError::Custom(format!("image '{}' missing after upsert", reference))
})
}
async fn upsert_manifest_record<C: ConnectionTrait>(
db: &C,
image_id: i32,
digest: &str,
) -> MicrosandboxResult<i32> {
let now = chrono::Utc::now().naive_utc();
manifest_entity::Entity::insert(manifest_entity::ActiveModel {
image_id: Set(image_id),
digest: Set(digest.to_string()),
created_at: Set(Some(now)),
..Default::default()
})
.on_conflict(
OnConflict::column(manifest_entity::Column::Digest)
.do_nothing()
.to_owned(),
)
.exec(db)
.await
.ok();
manifest_entity::Entity::find()
.filter(manifest_entity::Column::Digest.eq(digest))
.one(db)
.await?
.map(|model| model.id)
.ok_or_else(|| {
crate::MicrosandboxError::Custom(format!("manifest '{}' missing after upsert", digest))
})
}
async fn upsert_config_record<C: ConnectionTrait>(
db: &C,
manifest_id: i32,
digest: &str,
config: µsandbox_image::ImageConfig,
platform: µsandbox_image::Platform,
) -> MicrosandboxResult<()> {
let env_json = if config.env.is_empty() {
None
} else {
Some(serde_json::to_string(&config.env)?)
};
let cmd_json = config.cmd.as_ref().map(serde_json::to_string).transpose()?;
let entrypoint_json = config
.entrypoint
.as_ref()
.map(serde_json::to_string)
.transpose()?;
let volumes_json = if config.volumes.is_empty() {
None
} else {
Some(serde_json::to_string(&config.volumes)?)
};
let exposed_ports_json = if config.exposed_ports.is_empty() {
None
} else {
Some(serde_json::to_string(&config.exposed_ports)?)
};
let now = chrono::Utc::now().naive_utc();
config_entity::Entity::delete_many()
.filter(config_entity::Column::ManifestId.eq(manifest_id))
.exec(db)
.await?;
config_entity::Entity::insert(config_entity::ActiveModel {
manifest_id: Set(manifest_id),
digest: Set(digest.to_string()),
architecture: Set(Some(platform.arch.to_string())),
os: Set(Some(platform.os.to_string())),
os_variant: Set(None),
env: Set(env_json),
cmd: Set(cmd_json),
entrypoint: Set(entrypoint_json),
working_dir: Set(config.working_dir.clone()),
volumes: Set(volumes_json),
exposed_ports: Set(exposed_ports_json),
user: Set(config.user.clone()),
rootfs_type: Set(Some("layers".to_string())),
rootfs_diff_ids: Set(None),
history: Set(None),
created_at: Set(Some(now)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
async fn upsert_layer_record<C: ConnectionTrait>(
db: &C,
layer_meta: µsandbox_image::CachedLayerMetadata,
) -> MicrosandboxResult<i32> {
let now = chrono::Utc::now().naive_utc();
layer_entity::Entity::insert(layer_entity::ActiveModel {
digest: Set(layer_meta.digest.clone()),
diff_id: Set(layer_meta.diff_id.clone()),
media_type: Set(layer_meta.media_type.clone()),
size_bytes: Set(layer_meta
.size_bytes
.map(|s| i64::try_from(s).unwrap_or(i64::MAX))),
created_at: Set(Some(now)),
..Default::default()
})
.on_conflict(
OnConflict::column(layer_entity::Column::Digest)
.do_nothing()
.to_owned(),
)
.exec(db)
.await
.ok();
layer_entity::Entity::find()
.filter(layer_entity::Column::Digest.eq(&layer_meta.digest))
.one(db)
.await?
.map(|model| model.id)
.ok_or_else(|| {
crate::MicrosandboxError::Custom(format!(
"layer '{}' missing after upsert",
layer_meta.digest
))
})
}