use crate::db::{pool::Pool, Dialect};
use crate::error::AppError;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
pub fn architect_schema() -> String {
std::env::var("ARCHITECT_SCHEMA").unwrap_or_else(|_| "architect".into())
}
pub fn qualified_sys_table(table: &str) -> String {
format!("{}.{}", architect_schema(), table)
}
const CONFIG_TABLES: &[&str] = &[
"_sys_schemas",
"_sys_enums",
"_sys_tables",
"_sys_columns",
"_sys_indexes",
"_sys_relationships",
"_sys_api_entities",
"_sys_kv_stores",
];
pub const DEFAULT_PACKAGE_ID: &str = "_default";
pub async fn ensure_sys_tables(pool: &Pool, dialect: &dyn Dialect) -> Result<(), AppError> {
let schema = architect_schema();
if dialect.supports_schemas() {
sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema))
.execute(pool)
.await?;
}
for table in CONFIG_TABLES {
let q_table = qualified_sys_table(table);
let ddl = format!(
"CREATE TABLE IF NOT EXISTS {} (\
id TEXT NOT NULL, \
package_id TEXT NOT NULL, \
payload {} NOT NULL, \
updated_at {} NOT NULL DEFAULT {}, \
version BIGINT NOT NULL DEFAULT 1, \
PRIMARY KEY (id, package_id)\
)",
q_table,
dialect.sys_json_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
);
sqlx::query(&ddl).execute(pool).await?;
let alter_version = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS version BIGINT NOT NULL DEFAULT 1",
q_table
);
let _ = sqlx::query(&alter_version).execute(pool).await;
let alter_package = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS package_id TEXT NOT NULL DEFAULT '{}'",
q_table, DEFAULT_PACKAGE_ID
);
let _ = sqlx::query(&alter_package).execute(pool).await;
let history_table = qualified_sys_table(&format!("{}_history", table));
let history_ddl = format!(
"CREATE TABLE IF NOT EXISTS {} (\
id TEXT NOT NULL, \
package_id TEXT NOT NULL, \
payload {} NOT NULL, \
version BIGINT NOT NULL, \
created_at {} NOT NULL DEFAULT {}, \
PRIMARY KEY (id, package_id, version)\
)",
history_table,
dialect.sys_json_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
);
sqlx::query(&history_ddl).execute(pool).await?;
let alter_history_package = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS package_id TEXT NOT NULL DEFAULT '{}'",
history_table, DEFAULT_PACKAGE_ID
);
let _ = sqlx::query(&alter_history_package).execute(pool).await;
}
let q_packages = qualified_sys_table("_sys_packages");
let packages_ddl = format!(
"CREATE TABLE IF NOT EXISTS {} (\
id TEXT PRIMARY KEY, \
payload {} NOT NULL, \
updated_at {} NOT NULL DEFAULT {}, \
version BIGINT NOT NULL DEFAULT 1, \
semantic_version TEXT\
)",
q_packages,
dialect.sys_json_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
);
sqlx::query(&packages_ddl).execute(pool).await?;
let alter_pkg_semver = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS semantic_version TEXT",
q_packages
);
let _ = sqlx::query(&alter_pkg_semver).execute(pool).await;
let q_packages_history = qualified_sys_table("_sys_packages_history");
let packages_history_ddl = format!(
"CREATE TABLE IF NOT EXISTS {} (\
id TEXT NOT NULL, \
payload {} NOT NULL, \
version BIGINT NOT NULL, \
created_at {} NOT NULL DEFAULT {}, \
semantic_version TEXT, \
PRIMARY KEY (id, version)\
)",
q_packages_history,
dialect.sys_json_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
);
sqlx::query(&packages_history_ddl).execute(pool).await?;
let alter_pkg_hist_semver = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS semantic_version TEXT",
q_packages_history
);
let _ = sqlx::query(&alter_pkg_hist_semver).execute(pool).await;
let add_history_id = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS history_id {}",
q_packages_history,
dialect.sys_bigserial_type()
);
let _ = sqlx::query(&add_history_id).execute(pool).await;
let drop_old_pk = format!(
"ALTER TABLE {} DROP CONSTRAINT IF EXISTS _sys_packages_history_pkey",
q_packages_history
);
let _ = sqlx::query(&drop_old_pk).execute(pool).await;
if dialect.name() == "postgres" {
let add_new_pk_cond = format!(
"DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = '_sys_packages_history_history_id_pkey') THEN \
ALTER TABLE {} ADD CONSTRAINT _sys_packages_history_history_id_pkey PRIMARY KEY (history_id); END IF; END $$",
q_packages_history
);
let _ = sqlx::query(&add_new_pk_cond).execute(pool).await;
}
let q_tenants = qualified_sys_table("_sys_tenants");
let tenants_ddl = format!(
"CREATE TABLE IF NOT EXISTS {} (\
id TEXT PRIMARY KEY, \
strategy TEXT NOT NULL, \
database_url TEXT, \
updated_at {} NOT NULL DEFAULT {}, \
comment TEXT\
)",
q_tenants,
dialect.sys_timestamp_type(),
dialect.now_fn(),
);
sqlx::query(&tenants_ddl).execute(pool).await?;
let drop_schema_name = format!(
"ALTER TABLE {} DROP COLUMN IF EXISTS schema_name",
q_tenants
);
let _ = sqlx::query(&drop_schema_name).execute(pool).await;
let q_kv_data = qualified_sys_table("_sys_kv_data");
let kv_data_ddl = format!(
"CREATE TABLE IF NOT EXISTS {} (\
tenant_id TEXT NOT NULL, \
package_id TEXT NOT NULL, \
namespace TEXT NOT NULL, \
key TEXT NOT NULL, \
value {} NOT NULL, \
updated_at {} NOT NULL DEFAULT {}, \
PRIMARY KEY (tenant_id, package_id, namespace, key)\
)",
q_kv_data,
dialect.sys_json_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
);
sqlx::query(&kv_data_ddl).execute(pool).await?;
let alter_kv_tenant = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS tenant_id TEXT NOT NULL DEFAULT '_shared'",
q_kv_data
);
let _ = sqlx::query(&alter_kv_tenant).execute(pool).await;
let drop_pk = format!(
"ALTER TABLE {} DROP CONSTRAINT IF EXISTS _sys_kv_data_pkey",
q_kv_data
);
let _ = sqlx::query(&drop_pk).execute(pool).await;
let add_pk = format!(
"ALTER TABLE {} ADD PRIMARY KEY (tenant_id, package_id, namespace, key)",
q_kv_data
);
let _ = sqlx::query(&add_pk).execute(pool).await;
if dialect.name() == "postgres" {
let alter_value_json = format!(
"ALTER TABLE {} ALTER COLUMN value TYPE JSONB USING value::jsonb",
q_kv_data
);
let _ = sqlx::query(&alter_value_json).execute(pool).await;
}
ensure_migration_tables(pool, dialect).await?;
Ok(())
}
async fn ensure_migration_tables(pool: &Pool, dialect: &dyn Dialect) -> Result<(), AppError> {
let q_plans = qualified_sys_table("_sys_migration_plans");
let expires_at_col = match dialect.default_now_plus_hours(24) {
Some(expr) => format!(
"expires_at {} NOT NULL DEFAULT {}",
dialect.sys_timestamp_type(),
expr
),
None => format!("expires_at {}", dialect.sys_timestamp_type()),
};
sqlx::query(&format!(
"CREATE TABLE IF NOT EXISTS {} (\
id TEXT PRIMARY KEY, \
package_id TEXT NOT NULL, \
tenant_id TEXT NOT NULL, \
from_version TEXT, \
to_version TEXT NOT NULL, \
plan_json {} NOT NULL, \
zip_bytes BLOB NOT NULL, \
status TEXT NOT NULL DEFAULT 'pending', \
created_at {} NOT NULL DEFAULT {}, \
{}, \
applied_at {}\
)",
q_plans,
dialect.sys_json_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
expires_at_col,
dialect.sys_timestamp_type(),
))
.execute(pool)
.await?;
let q_audit = qualified_sys_table("_sys_migration_audit");
sqlx::query(&format!(
"CREATE TABLE IF NOT EXISTS {} (\
id {} PRIMARY KEY, \
migration_plan_id TEXT NOT NULL, \
package_id TEXT NOT NULL, \
tenant_id TEXT NOT NULL, \
from_version TEXT, \
to_version TEXT NOT NULL, \
step_number INT NOT NULL, \
operation TEXT NOT NULL, \
schema_name TEXT NOT NULL, \
table_name TEXT, \
object_name TEXT NOT NULL, \
object_type TEXT NOT NULL, \
description TEXT NOT NULL, \
ddl TEXT, \
safety TEXT NOT NULL, \
risk TEXT NOT NULL, \
status TEXT NOT NULL, \
error_message TEXT, \
executed_at {} NOT NULL DEFAULT {}\
)",
q_audit,
dialect.sys_bigserial_type(),
dialect.sys_timestamp_type(),
dialect.now_fn(),
))
.execute(pool)
.await?;
Ok(())
}
pub struct MigrationPlanRow {
pub id: String,
pub package_id: String,
pub tenant_id: String,
pub from_version: Option<String>,
pub to_version: String,
pub plan_json: serde_json::Value,
pub zip_bytes: Vec<u8>,
pub status: String,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
pub applied_at: Option<DateTime<Utc>>,
}
#[allow(clippy::too_many_arguments)]
pub async fn save_migration_plan(
pool: &Pool,
id: &str,
package_id: &str,
tenant_id: &str,
from_version: Option<&str>,
to_version: &str,
plan_json: &serde_json::Value,
zip_bytes: &[u8],
) -> Result<(), AppError> {
let q = qualified_sys_table("_sys_migration_plans");
sqlx::query(&format!(
"INSERT INTO {} (id, package_id, tenant_id, from_version, to_version, plan_json, zip_bytes, status, created_at, expires_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, 'pending', NOW(), NOW() + INTERVAL '24 hours')",
q
))
.bind(id)
.bind(package_id)
.bind(tenant_id)
.bind(from_version)
.bind(to_version)
.bind(plan_json)
.bind(zip_bytes)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_migration_plan(
pool: &Pool,
id: &str,
) -> Result<Option<MigrationPlanRow>, AppError> {
let q = qualified_sys_table("_sys_migration_plans");
#[allow(clippy::type_complexity)]
let row: Option<(String, String, String, Option<String>, String, serde_json::Value, Vec<u8>, String, DateTime<Utc>, DateTime<Utc>, Option<DateTime<Utc>>)> =
sqlx::query_as(&format!(
"SELECT id, package_id, tenant_id, from_version, to_version, plan_json, zip_bytes, status, created_at, expires_at, applied_at FROM {} WHERE id = $1",
q
))
.bind(id)
.fetch_optional(pool)
.await
.map_err(AppError::Db)?;
Ok(row.map(
|(
id,
package_id,
tenant_id,
from_version,
to_version,
plan_json,
zip_bytes,
status,
created_at,
expires_at,
applied_at,
)| {
MigrationPlanRow {
id,
package_id,
tenant_id,
from_version,
to_version,
plan_json,
zip_bytes,
status,
created_at,
expires_at,
applied_at,
}
},
))
}
pub async fn mark_migration_plan_applied(pool: &Pool, id: &str) -> Result<bool, AppError> {
let q = qualified_sys_table("_sys_migration_plans");
let result = sqlx::query(&format!(
"UPDATE {} SET status = 'applied', applied_at = NOW() WHERE id = $1 AND status = 'pending'",
q
))
.bind(id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_migration_audit(
pool: &Pool,
migration_plan_id: &str,
package_id: &str,
tenant_id: &str,
from_version: Option<&str>,
to_version: &str,
step_number: i32,
operation: &str,
schema_name: &str,
table_name: Option<&str>,
object_name: &str,
object_type: &str,
description: &str,
ddl: Option<&str>,
safety: &str,
risk: &str,
status: &str,
error_message: Option<&str>,
) -> Result<(), AppError> {
let q = qualified_sys_table("_sys_migration_audit");
sqlx::query(&format!(
"INSERT INTO {} (migration_plan_id, package_id, tenant_id, from_version, to_version, step_number, operation, schema_name, table_name, object_name, object_type, description, ddl, safety, risk, status, error_message, executed_at) \
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,NOW())",
q
))
.bind(migration_plan_id)
.bind(package_id)
.bind(tenant_id)
.bind(from_version)
.bind(to_version)
.bind(step_number)
.bind(operation)
.bind(schema_name)
.bind(table_name)
.bind(object_name)
.bind(object_type)
.bind(description)
.bind(ddl)
.bind(safety)
.bind(risk)
.bind(status)
.bind(error_message)
.execute(pool)
.await?;
Ok(())
}
fn config_record_id(table: &str, rec: &serde_json::Value) -> Result<String, AppError> {
let id = rec.get("id").and_then(|v| v.as_str());
let entity_id = rec.get("entity_id").and_then(|v| v.as_str());
match (table, id, entity_id) {
("_sys_api_entities", None, Some(eid)) => Ok(eid.to_string()),
(_, Some(id), _) => Ok(id.to_string()),
_ => Err(AppError::BadRequest(
"each config record must have an 'id' field (or 'entity_id' for api_entities)".into(),
)),
}
}
fn config_payloads_unchanged(
table: &str,
current: &HashMap<String, serde_json::Value>,
records: &[serde_json::Value],
) -> Result<bool, AppError> {
if current.len() != records.len() {
return Ok(false);
}
for rec in records {
let id = config_record_id(table, rec)?;
match current.get(&id) {
None => return Ok(false),
Some(existing) if existing != rec => return Ok(false),
Some(_) => {}
}
}
Ok(true)
}
pub async fn replace_config_rows(
tx: &mut crate::db::pool::Connection,
table: &str,
package_id: &str,
records: &[serde_json::Value],
) -> Result<(u64, i64), AppError> {
let q_table = qualified_sys_table(table);
let current_version: (Option<i64>,) = sqlx::query_as(&format!(
"SELECT COALESCE(MAX(version), 0) FROM {} WHERE package_id = $1",
q_table
))
.bind(package_id)
.fetch_one(&mut *tx)
.await
.map_err(AppError::Db)?;
let current_version = current_version.0.unwrap_or(0);
let rows: Vec<(String, serde_json::Value)> = sqlx::query_as(&format!(
"SELECT id, payload FROM {} WHERE package_id = $1",
q_table
))
.bind(package_id)
.fetch_all(&mut *tx)
.await
.map_err(AppError::Db)?;
let current: HashMap<String, serde_json::Value> = rows.into_iter().collect();
if config_payloads_unchanged(table, ¤t, records)? {
return Ok((0, current_version));
}
let history_table = qualified_sys_table(&format!("{}_history", table));
let new_version = current_version + 1;
sqlx::query(&format!(
"INSERT INTO {} (id, package_id, payload, version, created_at) SELECT id, package_id, payload, version, updated_at FROM {} WHERE package_id = $1",
history_table, q_table
))
.bind(package_id)
.execute(&mut *tx)
.await?;
sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_table))
.bind(package_id)
.execute(&mut *tx)
.await?;
let mut count = 0u64;
for rec in records {
let id = config_record_id(table, rec)?;
sqlx::query(&format!(
"INSERT INTO {} (id, package_id, payload, updated_at, version) VALUES ($1, $2, $3, NOW(), $4)",
q_table
))
.bind(&id)
.bind(package_id)
.bind(rec)
.bind(new_version)
.execute(&mut *tx)
.await?;
count += 1;
}
Ok((count, new_version))
}
const PACKAGES_TABLE: &str = "_sys_packages";
const PACKAGES_HISTORY_TABLE: &str = "_sys_packages_history";
pub struct PackageRow {
pub id: String,
pub payload: serde_json::Value,
pub version: i64,
pub updated_at: DateTime<Utc>,
pub semantic_version: Option<String>,
}
pub async fn list_packages(pool: &Pool) -> Result<Vec<PackageRow>, AppError> {
let q = qualified_sys_table(PACKAGES_TABLE);
#[allow(clippy::type_complexity)]
let rows: Vec<(
String,
serde_json::Value,
i64,
DateTime<Utc>,
Option<String>,
)> = sqlx::query_as(&format!(
"SELECT id, payload, version, updated_at, semantic_version FROM {} ORDER BY id",
q
))
.fetch_all(pool)
.await
.map_err(AppError::Db)?;
Ok(rows
.into_iter()
.map(
|(id, payload, version, updated_at, semantic_version)| PackageRow {
id,
payload,
version,
updated_at,
semantic_version,
},
)
.collect())
}
pub async fn get_package(pool: &Pool, id: &str) -> Result<Option<PackageRow>, AppError> {
let q = qualified_sys_table(PACKAGES_TABLE);
#[allow(clippy::type_complexity)]
let row: Option<(
String,
serde_json::Value,
i64,
DateTime<Utc>,
Option<String>,
)> = sqlx::query_as(&format!(
"SELECT id, payload, version, updated_at, semantic_version FROM {} WHERE id = $1",
q
))
.bind(id)
.fetch_optional(pool)
.await
.map_err(AppError::Db)?;
Ok(row.map(
|(id, payload, version, updated_at, semantic_version)| PackageRow {
id,
payload,
version,
updated_at,
semantic_version,
},
))
}
pub async fn count_package_kind(
pool: &Pool,
kind: &str,
package_id: &str,
) -> Result<i64, AppError> {
let table = sys_table_for_kind(kind)
.ok_or_else(|| AppError::BadRequest(format!("unknown config kind: {}", kind)))?;
let q = qualified_sys_table(table);
let (count,): (i64,) =
sqlx::query_as(&format!("SELECT COUNT(*) FROM {} WHERE package_id = $1", q))
.bind(package_id)
.fetch_one(pool)
.await
.map_err(AppError::Db)?;
Ok(count)
}
pub async fn list_package_ids(pool: &Pool) -> Result<Vec<String>, AppError> {
let q = qualified_sys_table(PACKAGES_TABLE);
let rows: Vec<(String,)> = sqlx::query_as(&format!("SELECT id FROM {} ORDER BY id", q))
.fetch_all(pool)
.await
.map_err(AppError::Db)?;
Ok(rows.into_iter().map(|(id,)| id).collect())
}
pub async fn upsert_package(
pool: &Pool,
id: &str,
payload: &serde_json::Value,
) -> Result<i64, AppError> {
let semantic_version = payload
.get("version")
.and_then(serde_json::Value::as_str)
.map(String::from)
.unwrap_or_default();
let q_packages = qualified_sys_table(PACKAGES_TABLE);
let q_packages_history = qualified_sys_table(PACKAGES_HISTORY_TABLE);
let mut tx = pool.begin().await?;
let current: Option<(serde_json::Value, i64, Option<String>)> = sqlx::query_as(&format!(
"SELECT payload, version, semantic_version FROM {} WHERE id = $1",
q_packages
))
.bind(id)
.fetch_optional(&mut *tx)
.await
.map_err(AppError::Db)?;
let new_version = match ¤t {
Some((_, v, Some(ref old_semver))) if *old_semver == semantic_version => *v,
Some((_, v, _)) => v + 1,
None => 1,
};
if let Some((old_payload, old_version, old_semver)) = current {
sqlx::query(&format!(
"INSERT INTO {} (id, payload, version, created_at, semantic_version) VALUES ($1, $2, $3, NOW(), $4)",
q_packages_history
))
.bind(id)
.bind(old_payload)
.bind(old_version)
.bind(old_semver)
.execute(&mut *tx)
.await?;
}
sqlx::query(&format!("DELETE FROM {} WHERE id = $1", q_packages))
.bind(id)
.execute(&mut *tx)
.await?;
let semver_param: Option<&str> = if semantic_version.is_empty() {
None
} else {
Some(semantic_version.as_str())
};
sqlx::query(&format!(
"INSERT INTO {} (id, payload, updated_at, version, semantic_version) VALUES ($1, $2, NOW(), $3, $4)",
q_packages
))
.bind(id)
.bind(payload)
.bind(new_version)
.bind(semver_param)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(new_version)
}
pub async fn delete_package_and_config(pool: &Pool, package_id: &str) -> Result<(), AppError> {
let q_packages = qualified_sys_table(PACKAGES_TABLE);
let q_packages_history = qualified_sys_table(PACKAGES_HISTORY_TABLE);
let q_kv_data = qualified_sys_table("_sys_kv_data");
let mut tx = pool.begin().await?;
let current: Option<(serde_json::Value, i64, Option<String>)> = sqlx::query_as(&format!(
"SELECT payload, version, semantic_version FROM {} WHERE id = $1",
q_packages
))
.bind(package_id)
.fetch_optional(&mut *tx)
.await
.map_err(AppError::Db)?;
if let Some((payload, version, semantic_version)) = current {
sqlx::query(&format!(
"INSERT INTO {} (id, payload, version, created_at, semantic_version) VALUES ($1, $2, $3, NOW(), $4)",
q_packages_history
))
.bind(package_id)
.bind(payload)
.bind(version)
.bind(semantic_version)
.execute(&mut *tx)
.await?;
}
for table in CONFIG_TABLES {
let q_table = qualified_sys_table(table);
sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_table))
.bind(package_id)
.execute(&mut *tx)
.await?;
let history_table = qualified_sys_table(&format!("{}_history", table));
sqlx::query(&format!(
"DELETE FROM {} WHERE package_id = $1",
history_table
))
.bind(package_id)
.execute(&mut *tx)
.await?;
}
sqlx::query(&format!("DELETE FROM {} WHERE package_id = $1", q_kv_data))
.bind(package_id)
.execute(&mut *tx)
.await?;
sqlx::query(&format!("DELETE FROM {} WHERE id = $1", q_packages))
.bind(package_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<Pool, AppError> {
#[cfg(feature = "postgres")]
return sqlx::postgres::PgPoolOptions::new()
.max_connections(max_connections)
.connect(database_url)
.await
.map_err(AppError::Db);
#[cfg(feature = "mysql")]
return sqlx::mysql::MySqlPoolOptions::new()
.max_connections(max_connections)
.connect(database_url)
.await
.map_err(AppError::Db);
#[cfg(feature = "sqlite")]
return sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(max_connections)
.connect(database_url)
.await
.map_err(AppError::Db);
#[cfg(not(any(feature = "postgres", feature = "mysql", feature = "sqlite")))]
Err(AppError::BadRequest(
"No database dialect feature enabled. Enable one of: postgres, mysql, sqlite.".into(),
))
}
pub async fn ensure_database_exists(database_url: &str) -> Result<(), AppError> {
#[cfg(feature = "postgres")]
{
use sqlx::ConnectOptions as _;
use std::str::FromStr;
let (admin_url, db_name) = parse_db_name_from_url(database_url)?;
if db_name.is_empty() || db_name == "postgres" {
return Ok(());
}
let opts = sqlx::postgres::PgConnectOptions::from_str(&admin_url)
.map_err(|e| AppError::BadRequest(format!("invalid DATABASE_URL: {}", e)))?;
let mut conn: sqlx::PgConnection = opts.connect().await.map_err(AppError::Db)?;
let exists: (bool,) =
sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)")
.bind(&db_name)
.fetch_one(&mut conn)
.await
.map_err(AppError::Db)?;
if !exists.0 {
let quoted = quote_ident(&db_name);
sqlx::query(&format!("CREATE DATABASE {}", quoted))
.execute(&mut conn)
.await
.map_err(AppError::Db)?;
}
}
#[cfg(not(feature = "postgres"))]
let _ = database_url;
Ok(())
}
#[cfg(feature = "postgres")]
fn parse_db_name_from_url(url: &str) -> Result<(String, String), AppError> {
let path_start = url
.rfind('/')
.ok_or_else(|| AppError::BadRequest("DATABASE_URL: no path".into()))?
+ 1;
let path_and_query = url.get(path_start..).unwrap_or("");
let db_name = path_and_query.split('?').next().unwrap_or("").trim();
let base = url.get(..path_start).unwrap_or(url);
let admin_url = format!("{}postgres", base);
Ok((admin_url, db_name.to_string()))
}
#[cfg(feature = "postgres")]
fn quote_ident(name: &str) -> String {
format!("\"{}\"", name.replace('\\', "\\\\").replace('"', "\\\""))
}
pub fn sys_table_for_kind(kind: &str) -> Option<&'static str> {
match kind {
"schemas" => Some("_sys_schemas"),
"enums" => Some("_sys_enums"),
"tables" => Some("_sys_tables"),
"columns" => Some("_sys_columns"),
"indexes" => Some("_sys_indexes"),
"relationships" => Some("_sys_relationships"),
"api_entities" => Some("_sys_api_entities"),
"kv_stores" => Some("_sys_kv_stores"),
_ => None,
}
}