use crate::core::Column as _;
use crate::migrate::{self, Migration, MigrationScope};
use crate::sql::sqlx::postgres::PgPoolOptions;
use crate::sql::sqlx::PgPool;
use crate::sql::Fetcher;
use std::path::Path;
use std::sync::Arc;
use tracing::{info, warn};
use super::error::TenancyError;
use super::org::{Org, StorageMode};
use super::pools::TenantPools;
#[derive(Debug, Default)]
pub struct TenantMigrationReport {
pub tenants: Vec<TenantMigrationOutcome>,
}
impl TenantMigrationReport {
#[must_use]
pub fn all_ok(&self) -> bool {
self.tenants.iter().all(|t| t.error.is_none())
}
#[must_use]
pub fn failure_count(&self) -> usize {
self.tenants.iter().filter(|t| t.error.is_some()).count()
}
}
#[derive(Debug)]
pub struct TenantMigrationOutcome {
pub slug: String,
pub applied: Vec<Migration>,
pub error: Option<TenancyError>,
}
pub async fn migrate_registry(
pools: &TenantPools,
dir: &Path,
) -> Result<Vec<Migration>, TenancyError> {
info!(target: "crate::tenancy", "applying registry-scoped migrations");
let scoped_dir = scoped_subset(dir, MigrationScope::Registry).await?;
let applied = match scoped_dir {
ScopedDir::Owned(temp) => {
let result = migrate::migrate(pools.registry(), temp.path()).await?;
drop(temp);
result
}
ScopedDir::Original => migrate::migrate(pools.registry(), dir).await?,
};
if let Err(e) = rustango::sql::sqlx::query(
r#"ALTER TABLE "rustango_operators"
ADD COLUMN IF NOT EXISTS "password_changed_at" TIMESTAMPTZ NULL"#,
)
.execute(pools.registry())
.await
{
tracing::warn!(
target: "crate::tenancy",
error = %e,
"ALTER rustango_operators password_changed_at failed",
);
}
if let Err(e) = crate::audit::ensure_table(pools.registry()).await {
tracing::warn!(
target: "crate::tenancy",
error = %e,
"audit::ensure_table failed for registry pool",
);
}
if let Err(e) = crate::contenttypes::ensure_seeded(pools.registry()).await {
tracing::warn!(
target: "crate::tenancy",
error = %e,
"contenttypes::ensure_seeded failed for registry pool",
);
}
info!(
target: "crate::tenancy",
applied = applied.len(),
"registry migrations done"
);
Ok(applied)
}
pub async fn migrate_tenants(
pools: &TenantPools,
dir: &Path,
registry_url: &str,
) -> Result<TenantMigrationReport, TenancyError> {
let scoped = scoped_subset(dir, MigrationScope::Tenant).await?;
let scoped_path = match &scoped {
ScopedDir::Owned(temp) => temp.path().to_path_buf(),
ScopedDir::Original => dir.to_path_buf(),
};
let migrations_in_scope = rustango::migrate::file::list_dir(&scoped_path)
.map(|m| m.len())
.unwrap_or(0);
let orgs: Vec<Org> = Org::objects()
.where_(Org::active.eq(true))
.fetch(pools.registry())
.await?;
info!(
target: "crate::tenancy",
tenants = orgs.len(),
migrations = migrations_in_scope,
dir = %dir.display(),
"applying tenant-scoped migrations"
);
if migrations_in_scope == 0 && !orgs.is_empty() {
warn!(
target: "crate::tenancy",
dir = %dir.display(),
"no tenant-scoped migrations found in dir; tenants will record applied=0 — \
pass the flat migrations directory or a project root containing one"
);
}
let mut report = TenantMigrationReport::default();
for org in &orgs {
let outcome = run_for_one_tenant(pools, org, &scoped_path, registry_url).await;
match &outcome {
Ok(applied) => info!(
target: "crate::tenancy",
slug = %org.slug,
applied = applied.len(),
"tenant migrations done"
),
Err(e) => warn!(
target: "crate::tenancy",
slug = %org.slug,
error = %e,
"tenant migration failed; continuing with remaining tenants"
),
}
report.tenants.push(match outcome {
Ok(applied) => TenantMigrationOutcome {
slug: org.slug.clone(),
applied,
error: None,
},
Err(error) => TenantMigrationOutcome {
slug: org.slug.clone(),
applied: Vec::new(),
error: Some(error),
},
});
}
Ok(report)
}
async fn run_for_one_tenant(
pools: &TenantPools,
org: &Org,
dir: &Path,
registry_url: &str,
) -> Result<Vec<Migration>, TenancyError> {
let mode = StorageMode::parse(&org.storage_mode).map_err(|got| {
TenancyError::Validation(format!(
"org `{}` has unknown storage_mode `{got}`",
org.slug
))
})?;
match mode {
StorageMode::Schema => {
let schema = org.schema_name.clone().unwrap_or_else(|| org.slug.clone());
let pool = build_schema_scoped_pool(registry_url, &schema).await?;
let applied = migrate::migrate(&pool, dir).await?;
if let Err(e) = crate::audit::ensure_table(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "audit::ensure_table failed for schema-mode tenant");
}
if let Err(e) = super::permissions::ensure_tables(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "permissions::ensure_tables failed for schema-mode tenant");
}
if let Err(e) = super::permissions::auto_create_permissions(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "auto_create_permissions failed for schema-mode tenant");
}
if let Err(e) = crate::contenttypes::ensure_seeded(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "contenttypes::ensure_seeded failed for schema-mode tenant");
}
if let Err(e) = super::auth_backends::ensure_api_keys_table(&pool).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "ensure_api_keys_table failed for schema-mode tenant");
}
pool.close().await;
Ok(applied)
}
StorageMode::Database => {
let tenant_pool = pools.pool_for_org(org).await?;
let applied = migrate::migrate(tenant_pool.pool(), dir).await?;
if let Err(e) = crate::audit::ensure_table(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "audit::ensure_table failed for database-mode tenant");
}
if let Err(e) = super::permissions::ensure_tables(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "permissions::ensure_tables failed for database-mode tenant");
}
if let Err(e) = super::permissions::auto_create_permissions(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "auto_create_permissions failed for database-mode tenant");
}
if let Err(e) = crate::contenttypes::ensure_seeded(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "contenttypes::ensure_seeded failed for database-mode tenant");
}
if let Err(e) = super::auth_backends::ensure_api_keys_table(tenant_pool.pool()).await {
tracing::warn!(target: "crate::tenancy", slug = %org.slug, error = %e, "ensure_api_keys_table failed for database-mode tenant");
}
Ok(applied)
}
}
}
async fn build_schema_scoped_pool(
registry_url: &str,
schema: &str,
) -> Result<PgPool, TenancyError> {
let bootstrap = PgPool::connect(registry_url).await?;
let create_sql = format!(
"CREATE SCHEMA IF NOT EXISTS {}",
quote_ident_for_schema(schema)
);
rustango::sql::sqlx::query(&create_sql)
.execute(&bootstrap)
.await?;
bootstrap.close().await;
let schema_owned: Arc<str> = Arc::from(schema);
let pool = PgPoolOptions::new()
.max_connections(2)
.after_connect(move |conn, _meta| {
let schema = Arc::clone(&schema_owned);
Box::pin(async move {
let stmt = format!(
"SET search_path TO {}, public",
quote_ident_for_schema(&schema)
);
rustango::sql::sqlx::query(&stmt).execute(conn).await?;
Ok(())
})
})
.connect(registry_url)
.await?;
Ok(pool)
}
async fn scoped_subset(dir: &Path, scope: MigrationScope) -> Result<ScopedDir, TenancyError> {
let all = rustango::migrate::file::list_dir(dir)?;
if all.iter().all(|m| m.scope == scope) {
return Ok(ScopedDir::Original);
}
let temp = tempdir_under_target()?;
let temp_path = temp.path().to_path_buf();
for mig in &all {
if mig.scope == scope {
let target = temp_path.join(format!("{}.json", mig.name));
rustango::migrate::file::write(&target, mig)?;
}
}
Ok(ScopedDir::Owned(temp))
}
enum ScopedDir {
Original,
Owned(TempDir),
}
struct TempDir(std::path::PathBuf);
impl TempDir {
fn path(&self) -> &Path {
&self.0
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
fn tempdir_under_target() -> Result<TempDir, TenancyError> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
let mut p = std::env::temp_dir();
p.push(format!("rustango_tenancy_scoped_{pid}_{n}"));
std::fs::create_dir_all(&p).map_err(|e| {
TenancyError::Validation(format!(
"could not create scoped-migration tempdir at {}: {e}",
p.display()
))
})?;
Ok(TempDir(p))
}
fn quote_ident_for_schema(name: &str) -> String {
let escaped = name.replace('"', "\"\"");
format!("\"{escaped}\"")
}