use crate::{
database::{DatabaseProvider, close_pool_on_drop},
password::random_password,
tenant::migrate_tenant_storage::{MigrateTenantStorageError, migrate_tenant_storage_inner},
};
use docbox_core::{
database::{
DbErr, DbPool, DbResult, ROOT_DATABASE_NAME,
create::{
check_database_exists, check_database_role_exists, create_database,
create_restricted_role, create_restricted_role_aws_iam, delete_database, delete_role,
},
migrations::apply_tenant_migrations,
models::tenant::{Tenant, TenantId},
utils::DatabaseErrorExt,
},
search::{SearchError, SearchIndexFactory, TenantSearchIndex},
secrets::{SecretManager, SecretManagerError},
storage::{CreateBucketOutcome, StorageLayer, StorageLayerError, StorageLayerFactory},
tenant::tenant_options_ext::TenantOptionsExt,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::ops::DerefMut;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum CreateTenantError {
#[error("error connecting to 'postgres' database: {0}")]
ConnectPostgres(DbErr),
#[error("error creating tenant database: {0}")]
CreateTenantDatabase(DbErr),
#[error("error connecting to tenant database: {0}")]
ConnectTenantDatabase(DbErr),
#[error("error connecting to root database: {0}")]
ConnectRootDatabase(DbErr),
#[error("error creating tenant database role: {0}")]
CreateTenantRole(DbErr),
#[error(transparent)]
Database(#[from] DbErr),
#[error("error serializing tenant secret: {0}")]
SerializeSecret(serde_json::Error),
#[error("failed to create tenant secret: secret name already exists")]
SecretAlreadyExists,
#[error("failed to create tenant secret: {0}")]
CreateTenantSecret(SecretManagerError),
#[error("tenant already exists")]
TenantAlreadyExist,
#[error("failed to create tenant storage bucket: {0}")]
CreateStorageBucket(StorageLayerError),
#[error("failed to setup s3 notification rules: {0}")]
SetupS3Notifications(StorageLayerError),
#[error("failed to setup storage origin rules rules: {0}")]
SetupStorageOrigins(StorageLayerError),
#[error("failed to create tenant search index: {0}")]
CreateSearchIndex(SearchError),
#[error("failed to migrate tenant search index: {0}")]
MigrateSearchIndex(SearchError),
#[error("failed to migrate tenant storage: {0}")]
MigrateStorage(MigrateTenantStorageError),
#[error("when not using db_iam_user the db_secret_name must be specified")]
MissingDatabaseSecretName,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct CreateTenantConfig {
pub id: TenantId,
pub name: String,
pub env: String,
pub db_name: String,
pub db_role_name: String,
pub db_secret_name: Option<String>,
#[serde(default)]
pub db_iam_user: bool,
pub storage_bucket_name: String,
pub storage_cors_origins: Vec<String>,
pub storage_s3_queue_arn: Option<String>,
pub search_index_name: String,
pub event_queue_url: Option<String>,
}
#[derive(Default)]
struct CreateTenantRollbackData {
search_index: Option<TenantSearchIndex>,
storage: Option<StorageLayer>,
secret: Option<(SecretManager, String)>,
database: Option<String>,
db_role: Option<String>,
}
impl CreateTenantRollbackData {
async fn rollback(&mut self, db_provider: &impl DatabaseProvider) {
if let Some(search_index) = self.search_index.take()
&& let Err(error) = search_index.delete_index().await
{
tracing::error!(?error, "failed to rollback created tenant search index");
}
if let Some(storage) = self.storage.take()
&& let Err(error) = storage.delete_bucket().await
{
tracing::error!(?error, "failed to rollback created tenant storage bucket");
}
if let Some((secrets, secret_name)) = self.secret.take()
&& let Err(error) = secrets.delete_secret(&secret_name, true).await
{
tracing::error!(?error, "failed to rollback tenant secret");
}
let db_name = self.database.take();
let db_role_name = self.db_role.take();
if db_name.is_some() || db_role_name.is_some() {
match db_provider.connect("postgres").await {
Ok(db_postgres) => {
if let Some(db_name) = db_name
&& let Err(error) = delete_database(&db_postgres, &db_name).await
{
tracing::error!(?error, "failed to rollback tenant database");
}
if let Some(db_role_name) = db_role_name
&& let Err(error) = delete_role(&db_postgres, &db_role_name).await
{
tracing::error!(?error, "failed to rollback tenant db role name");
}
db_postgres.close().await;
}
Err(error) => {
tracing::error!(
?error,
"failed to rollback tenant database, unable to acquire postgres database"
);
}
}
}
}
}
#[tracing::instrument(skip_all, fields(?config))]
pub async fn create_tenant(
db_provider: &impl DatabaseProvider,
search_factory: &SearchIndexFactory,
storage_factory: &StorageLayerFactory,
secrets: &SecretManager,
config: CreateTenantConfig,
) -> Result<Tenant, CreateTenantError> {
let mut rollback = CreateTenantRollbackData::default();
match create_tenant_inner(
db_provider,
search_factory,
storage_factory,
secrets,
config,
&mut rollback,
)
.await
{
Ok(value) => Ok(value),
Err(error) => {
rollback.rollback(db_provider).await;
Err(error)
}
}
}
#[tracing::instrument(skip_all, fields(?config))]
async fn create_tenant_inner(
db_provider: &impl DatabaseProvider,
search_factory: &SearchIndexFactory,
storage_factory: &StorageLayerFactory,
secrets: &SecretManager,
config: CreateTenantConfig,
rollback: &mut CreateTenantRollbackData,
) -> Result<Tenant, CreateTenantError> {
let (tenant_db, _tenant_db_guard) = {
let db_postgres = db_provider
.connect("postgres")
.await
.map_err(CreateTenantError::ConnectPostgres)?;
let _postgres_guard = close_pool_on_drop(&db_postgres);
initialize_tenant_database(&db_postgres, &config.db_name, rollback).await?;
tracing::info!("created tenant database");
let tenant_db = db_provider
.connect(&config.db_name)
.await
.map_err(CreateTenantError::ConnectTenantDatabase)?;
let tenant_db_guard = close_pool_on_drop(&tenant_db);
(tenant_db, tenant_db_guard)
};
if config.db_iam_user {
initialize_tenant_db_role_aws_iam(
&tenant_db,
&config.db_name,
&config.db_role_name,
rollback,
)
.await?;
tracing::info!("created tenant user (iam)");
} else {
let db_secret_name = config
.db_secret_name
.as_ref()
.ok_or(CreateTenantError::MissingDatabaseSecretName)?;
let db_role_password = random_password(30);
initialize_tenant_db_role(
&tenant_db,
&config.db_name,
&config.db_role_name,
&db_role_password,
rollback,
)
.await?;
tracing::info!("created tenant user");
initialize_tenant_db_secret(
secrets,
db_secret_name,
&config.db_role_name,
&db_role_password,
rollback,
)
.await?;
tracing::info!("created tenant database secret");
}
let root_db = db_provider
.connect(ROOT_DATABASE_NAME)
.await
.map_err(CreateTenantError::ConnectRootDatabase)?;
let _guard = close_pool_on_drop(&root_db);
let mut root_transaction = root_db
.begin()
.await
.inspect_err(|error| tracing::error!(?error, "failed to begin root transaction"))?;
let tenant: Tenant = Tenant::create(
root_transaction.deref_mut(),
docbox_core::database::models::tenant::CreateTenant {
id: config.id,
name: config.name,
db_name: config.db_name,
db_iam_user_name: if config.db_iam_user {
Some(config.db_role_name)
} else {
None
},
db_secret_name: config.db_secret_name,
s3_name: config.storage_bucket_name,
os_index_name: config.search_index_name,
event_queue_url: config.event_queue_url,
env: config.env,
},
)
.await
.map_err(|err| {
if err.is_duplicate_record() {
CreateTenantError::TenantAlreadyExist
} else {
CreateTenantError::Database(err)
}
})
.inspect_err(|error| tracing::error!(?error, "failed to create tenant"))?;
let mut tenant_transaction = tenant_db
.begin()
.await
.inspect_err(|error| tracing::error!(?error, "failed to begin tenant transaction"))?;
apply_tenant_migrations(
&mut root_transaction,
&mut tenant_transaction,
&tenant,
None,
)
.await
.inspect_err(|error| tracing::error!(?error, "failed to create tenant tables"))?;
tracing::debug!("creating tenant storage");
let storage = create_tenant_storage(
&tenant,
storage_factory,
config.storage_s3_queue_arn,
config.storage_cors_origins,
rollback,
)
.await?;
migrate_tenant_storage_inner(&storage, &mut root_transaction, &tenant, None)
.await
.map_err(CreateTenantError::MigrateStorage)?;
tracing::debug!("creating tenant search index");
let search = create_tenant_search(&tenant, search_factory, rollback).await?;
search
.apply_migrations(
&tenant,
&mut root_transaction,
&mut tenant_transaction,
None,
)
.await
.map_err(CreateTenantError::MigrateSearchIndex)?;
tenant_transaction
.commit()
.await
.inspect_err(|error| tracing::error!(?error, "failed to commit tenant transaction"))?;
root_transaction
.commit()
.await
.inspect_err(|error| tracing::error!(?error, "failed to commit root transaction"))?;
Ok(tenant)
}
#[tracing::instrument(skip(db_provider))]
pub async fn is_tenant_database_existing(
db_provider: &impl DatabaseProvider,
db_name: &str,
) -> DbResult<bool> {
let db_postgres = db_provider.connect("postgres").await?;
let _guard = close_pool_on_drop(&db_postgres);
check_database_exists(&db_postgres, db_name).await
}
#[tracing::instrument(skip(db_postgres, rollback))]
async fn initialize_tenant_database(
db_postgres: &DbPool,
db_name: &str,
rollback: &mut CreateTenantRollbackData,
) -> Result<(), CreateTenantError> {
let already_exists = match create_database(db_postgres, db_name).await {
Ok(_) => false,
Err(error) if error.is_database_exists() => true,
Err(error) => return Err(CreateTenantError::CreateTenantDatabase(error)),
};
if !already_exists {
rollback.database = Some(db_name.to_string());
}
Ok(())
}
#[tracing::instrument(skip(db_provider))]
pub async fn is_tenant_database_role_existing(
db_provider: &impl DatabaseProvider,
role_name: &str,
) -> DbResult<bool> {
let db_postgres = db_provider.connect("postgres").await?;
let _guard = close_pool_on_drop(&db_postgres);
check_database_role_exists(&db_postgres, role_name).await
}
#[tracing::instrument(skip(db, role_password, rollback))]
async fn initialize_tenant_db_role(
db: &DbPool,
db_name: &str,
role_name: &str,
role_password: &str,
rollback: &mut CreateTenantRollbackData,
) -> Result<(), CreateTenantError> {
create_restricted_role(db, db_name, role_name, role_password)
.await
.map_err(CreateTenantError::CreateTenantRole)?;
rollback.db_role = Some(role_name.to_string());
Ok(())
}
#[tracing::instrument(skip(db, rollback))]
async fn initialize_tenant_db_role_aws_iam(
db: &DbPool,
db_name: &str,
role_name: &str,
rollback: &mut CreateTenantRollbackData,
) -> Result<(), CreateTenantError> {
create_restricted_role_aws_iam(db, db_name, role_name)
.await
.map_err(CreateTenantError::CreateTenantRole)?;
rollback.db_role = Some(role_name.to_string());
Ok(())
}
#[tracing::instrument(skip(secrets))]
pub async fn is_tenant_database_role_secret_existing(
secrets: &SecretManager,
secret_name: &str,
) -> Result<bool, SecretManagerError> {
secrets
.get_secret(secret_name)
.await
.map(|value| value.is_some())
}
#[tracing::instrument(skip(secrets, role_password, rollback))]
async fn initialize_tenant_db_secret(
secrets: &SecretManager,
secret_name: &str,
role_name: &str,
role_password: &str,
rollback: &mut CreateTenantRollbackData,
) -> Result<(), CreateTenantError> {
if secrets
.has_secret(secret_name)
.await
.map_err(CreateTenantError::CreateTenantSecret)?
{
return Err(CreateTenantError::SecretAlreadyExists);
}
let secret_value = serde_json::to_string(&json!({
"username": role_name,
"password": role_password
}))
.map_err(CreateTenantError::SerializeSecret)?;
secrets
.set_secret(secret_name, &secret_value)
.await
.map_err(CreateTenantError::CreateTenantSecret)?;
rollback.secret = Some((secrets.clone(), secret_name.to_string()));
Ok(())
}
#[tracing::instrument(skip(storage, rollback))]
async fn create_tenant_storage(
tenant: &Tenant,
storage: &StorageLayerFactory,
s3_queue_arn: Option<String>,
origins: Vec<String>,
rollback: &mut CreateTenantRollbackData,
) -> Result<StorageLayer, CreateTenantError> {
let storage = storage.create_layer(tenant.storage_layer_options());
let outcome = storage
.create_bucket()
.await
.inspect_err(|error| tracing::error!(?error, "failed to create tenant bucket"))
.map_err(CreateTenantError::CreateStorageBucket)?;
if matches!(outcome, CreateBucketOutcome::New) {
rollback.storage = Some(storage.clone());
}
if let Some(s3_queue_arn) = s3_queue_arn {
storage
.add_bucket_notifications(&s3_queue_arn)
.await
.inspect_err(|error| {
tracing::error!(?error, "failed to add bucket notification configuration")
})
.map_err(CreateTenantError::SetupS3Notifications)?;
}
if !origins.is_empty() {
storage
.set_bucket_cors_origins(origins)
.await
.inspect_err(|error| tracing::error!(?error, "failed to add bucket cors rules"))
.map_err(CreateTenantError::SetupStorageOrigins)?;
}
Ok(storage)
}
#[tracing::instrument(skip(search, rollback))]
async fn create_tenant_search(
tenant: &Tenant,
search: &SearchIndexFactory,
rollback: &mut CreateTenantRollbackData,
) -> Result<TenantSearchIndex, CreateTenantError> {
let search = search.create_search_index(tenant);
search
.create_index()
.await
.map_err(CreateTenantError::CreateSearchIndex)
.inspect_err(|error| tracing::error!(?error, "failed to create search index"))?;
rollback.search_index = Some(search.clone());
Ok(search)
}