use crate::migrate::{Migration, MigrationScope};
use std::path::Path;
use std::sync::Arc;
use tracing::{info, warn};
use crate::core::Column as _;
use crate::migrate;
#[cfg(feature = "postgres")]
use crate::sql::sqlx::postgres::PgPoolOptions;
#[cfg(feature = "postgres")]
use crate::sql::sqlx::PgPool;
#[cfg(feature = "postgres")]
use crate::sql::Fetcher;
use sqlx::Database;
use super::error::TenancyError;
use super::org::{Org, StorageMode};
use super::pools::TenantPools;
#[derive(Debug, Default)]
pub struct TenantMigrationReport {
pub tenants: Vec<TenantMigrationOutcome>,
}
impl TenantMigrationReport {
#[must_use]
pub fn all_ok(&self) -> bool {
self.tenants.iter().all(|t| t.error.is_none())
}
#[must_use]
pub fn failure_count(&self) -> usize {
self.tenants.iter().filter(|t| t.error.is_some()).count()
}
}
#[derive(Debug)]
pub struct TenantMigrationOutcome {
pub slug: String,
pub applied: Vec<Migration>,
pub error: Option<TenancyError>,
}
pub async fn migrate_registry<DB: Database>(
pools: &TenantPools<DB>,
dir: &Path,
) -> Result<Vec<Migration>, TenancyError>
where
crate::sql::Pool: From<sqlx::Pool<DB>>,
{
migrate_registry_pool(&pools.registry_pool(), dir).await
}
pub async fn migrate_registry_pool(
registry: &crate::sql::Pool,
dir: &Path,
) -> Result<Vec<Migration>, TenancyError> {
info!(target: "crate::tenancy", "applying registry-scoped migrations");
let scoped_dir = scoped_subset(dir, MigrationScope::Registry).await?;
let applied = match scoped_dir {
ScopedDir::Owned(temp) => {
let result = crate::migrate::migrate_pool(registry, temp.path()).await?;
drop(temp);
result
}
ScopedDir::Original => crate::migrate::migrate_pool(registry, dir).await?,
};
#[cfg(feature = "postgres")]
if let Some(pg) = registry.as_postgres() {
let fixups: &[(&str, &str, &str)] = &[
(
"rustango_operators",
"password_changed_at",
"TIMESTAMPTZ NULL",
),
(
"rustango_orgs",
"backend_kind",
"VARCHAR(16) NOT NULL DEFAULT 'postgres'",
),
("rustango_orgs", "brand_name", "VARCHAR(80)"),
("rustango_orgs", "brand_tagline", "VARCHAR(200)"),
("rustango_orgs", "logo_path", "VARCHAR(120)"),
("rustango_orgs", "favicon_path", "VARCHAR(120)"),
("rustango_orgs", "primary_color", "VARCHAR(7)"),
("rustango_orgs", "theme_mode", "VARCHAR(8)"),
];
for (table, column, col_type) in fixups {
let sql =
format!(r#"ALTER TABLE "{table}" ADD COLUMN IF NOT EXISTS "{column}" {col_type}"#);
if let Err(e) = rustango::sql::sqlx::query(&sql).execute(pg).await {
tracing::warn!(
target: "crate::tenancy",
table = %table,
column = %column,
error = %e,
"registry column fixup failed (non-fatal — re-run after the bootstrap migrate)",
);
}
}
}
if let Err(e) = crate::audit::ensure_table_pool(registry).await {
tracing::warn!(
target: "crate::tenancy",
error = %e,
"audit::ensure_table_pool failed for registry pool",
);
}
if let Err(e) = crate::contenttypes::ensure_seeded_pool(registry).await {
tracing::warn!(
target: "crate::tenancy",
error = %e,
"contenttypes::ensure_seeded_pool failed for registry pool",
);
}
info!(
target: "crate::tenancy",
applied = applied.len(),
"registry migrations done"
);
Ok(applied)
}
#[cfg(feature = "postgres")]
pub async fn migrate_tenants(
pools: &TenantPools,
dir: &Path,
registry_url: &str,
) -> Result<TenantMigrationReport, TenancyError> {
let scoped = scoped_subset(dir, MigrationScope::Tenant).await?;
let scoped_path = match &scoped {
ScopedDir::Owned(temp) => temp.path().to_path_buf(),
ScopedDir::Original => dir.to_path_buf(),
};
let migrations_in_scope = rustango::migrate::file::list_dir(&scoped_path)
.map(|m| m.len())
.unwrap_or(0);
let orgs: Vec<Org> = Org::objects()
.where_(Org::active.eq(true))
.fetch(pools.registry())
.await?;
info!(
target: "crate::tenancy",
tenants = orgs.len(),
migrations = migrations_in_scope,
dir = %dir.display(),
"applying tenant-scoped migrations"
);
if migrations_in_scope == 0 && !orgs.is_empty() {
warn!(
target: "crate::tenancy",
dir = %dir.display(),
"no tenant-scoped migrations found in dir; tenants will record applied=0 — \
pass the flat migrations directory or a project root containing one"
);
}
let mut report = TenantMigrationReport::default();
for org in &orgs {
let outcome = run_for_one_tenant(pools, org, &scoped_path, registry_url).await;
match &outcome {
Ok(applied) => info!(
target: "crate::tenancy",
slug = %org.slug,
applied = applied.len(),
"tenant migrations done"
),
Err(e) => warn!(
target: "crate::tenancy",
slug = %org.slug,
error = %e,
"tenant migration failed; continuing with remaining tenants"
),
}
report.tenants.push(match outcome {
Ok(applied) => TenantMigrationOutcome {
slug: org.slug.clone(),
applied,
error: None,
},
Err(error) => TenantMigrationOutcome {
slug: org.slug.clone(),
applied: Vec::new(),
error: Some(error),
},
});
}
Ok(report)
}
pub async fn migrate_tenants_db<DB: Database>(
pools: &TenantPools<DB>,
dir: &Path,
_registry_url: &str,
) -> Result<TenantMigrationReport, TenancyError>
where
crate::sql::Pool: From<sqlx::Pool<DB>>,
{
use crate::sql::FetcherPool as _;
let scoped = scoped_subset(dir, MigrationScope::Tenant).await?;
let scoped_path = match &scoped {
ScopedDir::Owned(temp) => temp.path().to_path_buf(),
ScopedDir::Original => dir.to_path_buf(),
};
let registry_pool = pools.registry_pool();
let orgs: Vec<Org> = Org::objects()
.where_(Org::active.eq(true))
.fetch_pool(®istry_pool)
.await?;
info!(
target: "crate::tenancy",
tenants = orgs.len(),
dir = %dir.display(),
"applying tenant-scoped migrations (db-mode only)"
);
let mut report = TenantMigrationReport::default();
for org in &orgs {
let outcome = run_for_one_tenant_db(pools, org, &scoped_path).await;
match &outcome {
Ok(applied) => info!(
target: "crate::tenancy",
slug = %org.slug,
applied = applied.len(),
"tenant migrations done"
),
Err(e) => warn!(
target: "crate::tenancy",
slug = %org.slug,
error = %e,
"tenant migration failed; continuing with remaining tenants"
),
}
report.tenants.push(match outcome {
Ok(applied) => TenantMigrationOutcome {
slug: org.slug.clone(),
applied,
error: None,
},
Err(error) => TenantMigrationOutcome {
slug: org.slug.clone(),
applied: Vec::new(),
error: Some(error),
},
});
}
Ok(report)
}
async fn run_for_one_tenant_db<DB: Database>(
pools: &TenantPools<DB>,
org: &Org,
dir: &Path,
) -> Result<Vec<Migration>, TenancyError>
where
crate::sql::Pool: From<sqlx::Pool<DB>>,
{
let mode = StorageMode::parse(&org.storage_mode).map_err(|got| {
TenancyError::Validation(format!(
"org `{}` has unknown storage_mode `{got}`",
org.slug
))
})?;
if !matches!(mode, StorageMode::Database) {
return Err(TenancyError::Validation(format!(
"org `{}` is schema-mode but migrate_tenants_db only handles \
database-mode tenants (schema-mode is PG-only by language)",
org.slug,
)));
}
let tenant_pool = pools.database_pool_for_org(org).await?;
let inner_pool = match &tenant_pool {
super::pools::TenantPool::Database { pool } => crate::sql::Pool::from((**pool).clone()),
#[cfg(feature = "postgres")]
super::pools::TenantPool::Schema { .. } => {
unreachable!("database_pool_for_org rejects schema-mode")
}
};
let applied = migrate::migrate_pool(&inner_pool, dir).await?;
if let Err(e) = crate::audit::ensure_table_pool(&inner_pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "audit::ensure_table_pool failed for database-mode tenant");
}
if let Err(e) = super::permissions::ensure_tables_pool(&inner_pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "permissions::ensure_tables_pool failed for database-mode tenant");
}
if let Err(e) = super::permissions::auto_create_permissions_pool(&inner_pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "auto_create_permissions_pool failed for database-mode tenant");
}
if let Err(e) = crate::contenttypes::ensure_seeded_pool(&inner_pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "contenttypes::ensure_seeded_pool failed for database-mode tenant");
}
if let Err(e) = super::auth_backends::ensure_api_keys_table_pool(&inner_pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "ensure_api_keys_table_pool failed for database-mode tenant");
}
Ok(applied)
}
pub async fn migrate_tenants_dyn<DB: Database>(
pools: &TenantPools<DB>,
dir: &Path,
registry_url: &str,
) -> Result<TenantMigrationReport, TenancyError>
where
crate::sql::Pool: From<sqlx::Pool<DB>>,
{
#[cfg(feature = "postgres")]
if let Some(pg) = (pools as &dyn std::any::Any).downcast_ref::<TenantPools<sqlx::Postgres>>() {
return migrate_tenants(pg, dir, registry_url).await;
}
migrate_tenants_db(pools, dir, registry_url).await
}
#[cfg(feature = "postgres")]
async fn run_for_one_tenant(
pools: &TenantPools,
org: &Org,
dir: &Path,
registry_url: &str,
) -> Result<Vec<Migration>, TenancyError> {
let mode = StorageMode::parse(&org.storage_mode).map_err(|got| {
TenancyError::Validation(format!(
"org `{}` has unknown storage_mode `{got}`",
org.slug
))
})?;
match mode {
StorageMode::Schema => {
let schema = org.schema_name.clone().unwrap_or_else(|| org.slug.clone());
let pool = build_schema_scoped_pool(registry_url, &schema).await?;
let applied = migrate::migrate(&pool, dir).await?;
if let Err(e) = crate::audit::ensure_table(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "audit::ensure_table failed for schema-mode tenant");
}
if let Err(e) = super::permissions::ensure_tables(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "permissions::ensure_tables failed for schema-mode tenant");
}
if let Err(e) = super::permissions::auto_create_permissions(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "auto_create_permissions failed for schema-mode tenant");
}
if let Err(e) = crate::contenttypes::ensure_seeded(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "contenttypes::ensure_seeded failed for schema-mode tenant");
}
if let Err(e) = super::auth_backends::ensure_api_keys_table(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "ensure_api_keys_table failed for schema-mode tenant");
}
pool.close().await;
Ok(applied)
}
StorageMode::Database => {
let tenant_pool = pools.pool_for_org(org).await?;
let applied = migrate::migrate(tenant_pool.pool(), dir).await?;
if let Err(e) = crate::audit::ensure_table(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "audit::ensure_table failed for database-mode tenant");
}
if let Err(e) = super::permissions::ensure_tables(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "permissions::ensure_tables failed for database-mode tenant");
}
if let Err(e) = super::permissions::auto_create_permissions(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "auto_create_permissions failed for database-mode tenant");
}
if let Err(e) = crate::contenttypes::ensure_seeded(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "contenttypes::ensure_seeded failed for database-mode tenant");
}
if let Err(e) = super::auth_backends::ensure_api_keys_table(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "ensure_api_keys_table failed for database-mode tenant");
}
Ok(applied)
}
}
}
#[cfg(feature = "postgres")]
async fn build_schema_scoped_pool(
registry_url: &str,
schema: &str,
) -> Result<PgPool, TenancyError> {
let bootstrap = PgPool::connect(registry_url).await?;
let create_sql = format!(
"CREATE SCHEMA IF NOT EXISTS {}",
quote_ident_for_schema(schema)
);
rustango::sql::sqlx::query(&create_sql)
.execute(&bootstrap)
.await?;
bootstrap.close().await;
let schema_owned: Arc<str> = Arc::from(schema);
let pool = PgPoolOptions::new()
.max_connections(2)
.after_connect(move |conn, _meta| {
let schema = Arc::clone(&schema_owned);
Box::pin(async move {
let stmt = format!(
"SET search_path TO {}, public",
quote_ident_for_schema(&schema)
);
rustango::sql::sqlx::query(&stmt).execute(conn).await?;
Ok(())
})
})
.connect(registry_url)
.await?;
Ok(pool)
}
async fn scoped_subset(dir: &Path, scope: MigrationScope) -> Result<ScopedDir, TenancyError> {
let all = rustango::migrate::file::list_dir(dir)?;
if all.iter().all(|m| m.scope == scope) {
return Ok(ScopedDir::Original);
}
let temp = tempdir_under_target()?;
let temp_path = temp.path().to_path_buf();
for mig in &all {
if mig.scope == scope {
let target = temp_path.join(format!("{}.json", mig.name));
rustango::migrate::file::write(&target, mig)?;
}
}
Ok(ScopedDir::Owned(temp))
}
enum ScopedDir {
Original,
Owned(TempDir),
}
struct TempDir(std::path::PathBuf);
impl TempDir {
fn path(&self) -> &Path {
&self.0
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
fn tempdir_under_target() -> Result<TempDir, TenancyError> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
let mut p = std::env::temp_dir();
p.push(format!("rustango_tenancy_scoped_{pid}_{n}"));
std::fs::create_dir_all(&p).map_err(|e| {
TenancyError::Validation(format!(
"could not create scoped-migration tempdir at {}: {e}",
p.display()
))
})?;
Ok(TempDir(p))
}
fn quote_ident_for_schema(name: &str) -> String {
let escaped = name.replace('"', "\"\"");
format!("\"{escaped}\"")
}