use kellnr_migration::{Migrator, MigratorTrait};
use sea_orm::{
ConnectOptions, ConnectionTrait, Database, DatabaseBackend, DatabaseConnection, DbErr,
Statement,
};
use tracing::{info, trace};
const OLD_MIGRATIONS: &[&str] = &[
"m20220101_000001_create_table",
"m20220101_000002_create_table",
"m20220101_000003_create_table",
"m20220101_000004_create_table",
"m20220101_000005_create_table",
"m20220101_000006_create_table",
"m20220101_000007_create_table",
"m20220101_000008_create_table",
"m20220101_000009_create_table",
"m20220101_0000010_create_table",
"m20220101_0000011_create_table",
"m20250227_005754_add_readonly_user",
"m20250319_191043_add_groups",
"m20250412_0000012_hash_tokens",
"m20250414_102510_add_unique_indices",
"m20250911_000001_cratesio_indices",
"m20250923_095440_webhooks",
"m20260122_000001_add_pubtime",
];
pub async fn init_database(
connection_string: impl ToString,
max_con: u32,
) -> Result<DatabaseConnection, DbErr> {
let mut opt = ConnectOptions::new(connection_string.to_string());
if max_con > 0 {
opt.max_connections(max_con);
}
let db = Database::connect(opt).await?;
validate_pre_upgrade(&db).await?;
cleanup_old_migrations(&db).await?;
Migrator::up(&db, None).await?;
Ok(db)
}
async fn validate_pre_upgrade(db: &DatabaseConnection) -> Result<(), DbErr> {
if !has_old_migration_entries(db).await? {
return Ok(());
}
trace!("Detected v5 migration entries, validating schema before upgrade...");
let column_checks = [
("crate_index", "pubtime"),
("cratesio_index", "pubtime"),
("user", "is_read_only"),
("krate", "restricted_download"),
("krate", "e_tag"),
("krate", "original_name"),
("cratesio_crate", "max_version"),
("cratesio_meta", "documentation"),
];
for (table, column) in column_checks {
if !has_column(db, table, column).await? {
return Err(DbErr::Custom(format!(
"Missing column '{table}.{column}'. Please ensure you have first upgraded \
to Kellnr v5.14.0 before upgrading to v6.0.0."
)));
}
}
Ok(())
}
async fn has_old_migration_entries(db: &DatabaseConnection) -> Result<bool, DbErr> {
let backend = db.get_database_backend();
let stmt = Statement::from_string(
backend,
"SELECT version FROM seaql_migrations WHERE version = 'm20220101_000001_create_table' LIMIT 1",
);
match db.query_one_raw(stmt).await {
Ok(Some(_)) => Ok(true),
Ok(None) | Err(_) => Ok(false),
}
}
async fn has_column(db: &DatabaseConnection, table: &str, column: &str) -> Result<bool, DbErr> {
let backend = db.get_database_backend();
let sql = match backend {
DatabaseBackend::Postgres => format!(
"SELECT 1 FROM information_schema.columns \
WHERE table_name = '{table}' AND column_name = '{column}'"
),
DatabaseBackend::Sqlite => {
format!("SELECT 1 FROM pragma_table_info('{table}') WHERE name = '{column}'")
}
_ => return Err(DbErr::Custom("Unsupported database backend".to_string())),
};
let stmt = Statement::from_string(backend, sql);
match db.query_one_raw(stmt).await {
Ok(Some(_)) => Ok(true),
Ok(None) | Err(_) => Ok(false),
}
}
async fn cleanup_old_migrations(db: &DatabaseConnection) -> Result<(), DbErr> {
let mut cleaned = 0;
for migration in OLD_MIGRATIONS {
let sql = format!("DELETE FROM seaql_migrations WHERE version = '{migration}'");
if let Ok(result) = db.execute_unprepared(&sql).await
&& result.rows_affected() > 0
{
cleaned += 1;
}
}
if cleaned > 0 {
info!(
"Cleaned up {} old migration entries for v6.0.0 upgrade",
cleaned
);
}
Ok(())
}