1use docbox_database::{
2 DbErr, DbPool, DbResult, ROOT_DATABASE_NAME,
3 create::{
4 check_database_exists, check_database_role_exists, create_database, create_restricted_role,
5 delete_database, delete_role,
6 },
7 migrations::apply_tenant_migrations,
8 models::tenant::{Tenant, TenantId},
9 utils::DatabaseErrorExt,
10};
11use docbox_search::{SearchError, SearchIndexFactory, TenantSearchIndex};
12use docbox_secrets::{SecretManager, SecretManagerError};
13use docbox_storage::{
14 CreateBucketOutcome, StorageLayerError, StorageLayerFactory, TenantStorageLayer,
15};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::ops::DerefMut;
19use thiserror::Error;
20
21use crate::{
22 database::{DatabaseProvider, close_pool_on_drop},
23 password::random_password,
24};
25
26#[derive(Debug, Error)]
28pub enum CreateTenantError {
29 #[error("error connecting to 'postgres' database: {0}")]
31 ConnectPostgres(DbErr),
32
33 #[error("error creating tenant database: {0}")]
35 CreateTenantDatabase(DbErr),
36
37 #[error("error connecting to tenant database: {0}")]
39 ConnectTenantDatabase(DbErr),
40
41 #[error("error connecting to root database: {0}")]
43 ConnectRootDatabase(DbErr),
44
45 #[error("error creating tenant database role: {0}")]
47 CreateTenantRole(DbErr),
48
49 #[error(transparent)]
51 Database(#[from] DbErr),
52
53 #[error("error serializing tenant secret: {0}")]
55 SerializeSecret(serde_json::Error),
56
57 #[error("failed to create tenant secret: secret name already exists")]
59 SecretAlreadyExists,
60
61 #[error("failed to create tenant secret: {0}")]
63 CreateTenantSecret(SecretManagerError),
64
65 #[error("tenant already exists")]
67 TenantAlreadyExist,
68
69 #[error("failed to create tenant storage bucket: {0}")]
71 CreateStorageBucket(StorageLayerError),
72
73 #[error("failed to setup s3 notification rules: {0}")]
75 SetupS3Notifications(StorageLayerError),
76
77 #[error("failed to setup storage origin rules rules: {0}")]
79 SetupStorageOrigins(StorageLayerError),
80
81 #[error("failed to create tenant search index: {0}")]
83 CreateSearchIndex(SearchError),
84
85 #[error("failed to migrate tenant search index: {0}")]
87 MigrateSearchIndex(SearchError),
88}
89
90#[derive(Debug, Deserialize, Serialize, Clone)]
92pub struct CreateTenantConfig {
93 pub id: TenantId,
95 pub name: String,
97 pub env: String,
99
100 pub db_name: String,
102 pub db_secret_name: String,
105 pub db_role_name: String,
107
108 pub storage_bucket_name: String,
110 pub storage_cors_origins: Vec<String>,
112 pub storage_s3_queue_arn: Option<String>,
115
116 pub search_index_name: String,
118
119 pub event_queue_url: Option<String>,
121}
122
123#[derive(Default)]
125struct CreateTenantRollbackData {
126 search_index: Option<TenantSearchIndex>,
127 storage: Option<TenantStorageLayer>,
128 secret: Option<(SecretManager, String)>,
129 database: Option<String>,
130 db_role: Option<String>,
131}
132
133impl CreateTenantRollbackData {
134 async fn rollback(&mut self, db_provider: &impl DatabaseProvider) {
135 if let Some(search_index) = self.search_index.take()
137 && let Err(error) = search_index.delete_index().await
138 {
139 tracing::error!(?error, "failed to rollback created tenant search index");
140 }
141
142 if let Some(storage) = self.storage.take()
144 && let Err(error) = storage.delete_bucket().await
145 {
146 tracing::error!(?error, "failed to rollback created tenant storage bucket");
147 }
148
149 if let Some((secrets, secret_name)) = self.secret.take()
151 && let Err(error) = secrets.delete_secret(&secret_name, true).await
152 {
153 tracing::error!(?error, "failed to rollback tenant secret");
154 }
155
156 let db_name = self.database.take();
158 let db_role_name = self.db_role.take();
159 if db_name.is_some() || db_role_name.is_some() {
160 match db_provider.connect("postgres").await {
161 Ok(db_postgres) => {
162 if let Some(db_name) = db_name
164 && let Err(error) = delete_database(&db_postgres, &db_name).await
165 {
166 tracing::error!(?error, "failed to rollback tenant database");
167 }
168
169 if let Some(db_role_name) = db_role_name
171 && let Err(error) = delete_role(&db_postgres, &db_role_name).await
172 {
173 tracing::error!(?error, "failed to rollback tenant db role name");
174 }
175
176 db_postgres.close().await;
177 }
178 Err(error) => {
179 tracing::error!(
180 ?error,
181 "failed to rollback tenant database, unable to acquire postgres database"
182 );
183 }
184 }
185 }
186 }
187}
188
189#[tracing::instrument(skip_all, fields(?config))]
203pub async fn create_tenant(
204 db_provider: &impl DatabaseProvider,
205 search_factory: &SearchIndexFactory,
206 storage_factory: &StorageLayerFactory,
207 secrets: &SecretManager,
208 config: CreateTenantConfig,
209) -> Result<Tenant, CreateTenantError> {
210 let mut rollback = CreateTenantRollbackData::default();
211
212 match create_tenant_inner(
213 db_provider,
214 search_factory,
215 storage_factory,
216 secrets,
217 config,
218 &mut rollback,
219 )
220 .await
221 {
222 Ok(value) => Ok(value),
223 Err(error) => {
224 rollback.rollback(db_provider).await;
226 Err(error)
227 }
228 }
229}
230
231#[tracing::instrument(skip_all, fields(?config))]
232async fn create_tenant_inner(
233 db_provider: &impl DatabaseProvider,
234 search_factory: &SearchIndexFactory,
235 storage_factory: &StorageLayerFactory,
236 secrets: &SecretManager,
237 config: CreateTenantConfig,
238 rollback: &mut CreateTenantRollbackData,
239) -> Result<Tenant, CreateTenantError> {
240 let (tenant_db, _tenant_db_guard) = {
241 let db_postgres = db_provider
243 .connect("postgres")
244 .await
245 .map_err(CreateTenantError::ConnectPostgres)?;
246 let _postgres_guard = close_pool_on_drop(&db_postgres);
247
248 initialize_tenant_database(&db_postgres, &config.db_name, rollback).await?;
250 tracing::info!("created tenant database");
251
252 let tenant_db = db_provider
254 .connect(&config.db_name)
255 .await
256 .map_err(CreateTenantError::ConnectTenantDatabase)?;
257
258 let tenant_db_guard = close_pool_on_drop(&tenant_db);
259 (tenant_db, tenant_db_guard)
260 };
261
262 let db_role_password = random_password(30);
264
265 initialize_tenant_db_role(
266 &tenant_db,
267 &config.db_name,
268 &config.db_role_name,
269 &db_role_password,
270 rollback,
271 )
272 .await?;
273 tracing::info!("created tenant user");
274
275 initialize_tenant_db_secret(
276 secrets,
277 &config.db_secret_name,
278 &config.db_role_name,
279 &db_role_password,
280 rollback,
281 )
282 .await?;
283 tracing::info!("created tenant database secret");
284
285 let root_db = db_provider
287 .connect(ROOT_DATABASE_NAME)
288 .await
289 .map_err(CreateTenantError::ConnectRootDatabase)?;
290
291 let _guard = close_pool_on_drop(&root_db);
292
293 let mut root_transaction = root_db
295 .begin()
296 .await
297 .inspect_err(|error| tracing::error!(?error, "failed to begin root transaction"))?;
298
299 let tenant: Tenant = Tenant::create(
301 root_transaction.deref_mut(),
302 docbox_database::models::tenant::CreateTenant {
303 id: config.id,
304 name: config.name,
305 db_name: config.db_name,
306 db_secret_name: config.db_secret_name,
307 s3_name: config.storage_bucket_name,
308 os_index_name: config.search_index_name,
309 event_queue_url: config.event_queue_url,
310 env: config.env,
311 },
312 )
313 .await
314 .map_err(|err| {
315 if err.is_duplicate_record() {
317 CreateTenantError::TenantAlreadyExist
318 } else {
319 CreateTenantError::Database(err)
320 }
321 })
322 .inspect_err(|error| tracing::error!(?error, "failed to create tenant"))?;
323
324 let mut tenant_transaction = tenant_db
326 .begin()
327 .await
328 .inspect_err(|error| tracing::error!(?error, "failed to begin tenant transaction"))?;
329
330 apply_tenant_migrations(
332 &mut root_transaction,
333 &mut tenant_transaction,
334 &tenant,
335 None,
336 )
337 .await
338 .inspect_err(|error| tracing::error!(?error, "failed to create tenant tables"))?;
339
340 tracing::debug!("creating tenant storage");
342 create_tenant_storage(
343 &tenant,
344 storage_factory,
345 config.storage_s3_queue_arn,
346 config.storage_cors_origins,
347 rollback,
348 )
349 .await?;
350
351 tracing::debug!("creating tenant search index");
353 let search = create_tenant_search(&tenant, search_factory, rollback).await?;
354
355 search
357 .apply_migrations(
358 &tenant,
359 &mut root_transaction,
360 &mut tenant_transaction,
361 None,
362 )
363 .await
364 .map_err(CreateTenantError::MigrateSearchIndex)?;
365
366 tenant_transaction
368 .commit()
369 .await
370 .inspect_err(|error| tracing::error!(?error, "failed to commit tenant transaction"))?;
371 root_transaction
372 .commit()
373 .await
374 .inspect_err(|error| tracing::error!(?error, "failed to commit root transaction"))?;
375
376 Ok(tenant)
377}
378
379#[tracing::instrument(skip(db_provider))]
382pub async fn is_tenant_database_existing(
383 db_provider: &impl DatabaseProvider,
384 db_name: &str,
385) -> DbResult<bool> {
386 let db_postgres = db_provider.connect("postgres").await?;
388 let _guard = close_pool_on_drop(&db_postgres);
389
390 check_database_exists(&db_postgres, db_name).await
391}
392
393#[tracing::instrument(skip(db_postgres, rollback))]
397async fn initialize_tenant_database(
398 db_postgres: &DbPool,
399 db_name: &str,
400 rollback: &mut CreateTenantRollbackData,
401) -> Result<(), CreateTenantError> {
402 let already_exists = match create_database(db_postgres, db_name).await {
403 Ok(_) => false,
405 Err(error) if error.is_database_exists() => true,
407 Err(error) => return Err(CreateTenantError::CreateTenantDatabase(error)),
409 };
410
411 if !already_exists {
412 rollback.database = Some(db_name.to_string());
413 }
414
415 Ok(())
416}
417
418#[tracing::instrument(skip(db_provider))]
421pub async fn is_tenant_database_role_existing(
422 db_provider: &impl DatabaseProvider,
423 role_name: &str,
424) -> DbResult<bool> {
425 let db_postgres = db_provider.connect("postgres").await?;
427
428 let _guard = close_pool_on_drop(&db_postgres);
429
430 check_database_role_exists(&db_postgres, role_name).await
431}
432
433#[tracing::instrument(skip(db, role_password, rollback))]
436async fn initialize_tenant_db_role(
437 db: &DbPool,
438 db_name: &str,
439 role_name: &str,
440 role_password: &str,
441 rollback: &mut CreateTenantRollbackData,
442) -> Result<(), CreateTenantError> {
443 create_restricted_role(db, db_name, role_name, role_password)
445 .await
446 .map_err(CreateTenantError::CreateTenantRole)?;
447
448 rollback.db_role = Some(role_name.to_string());
449
450 Ok(())
451}
452
453#[tracing::instrument(skip(secrets))]
456pub async fn is_tenant_database_role_secret_existing(
457 secrets: &SecretManager,
458 secret_name: &str,
459) -> Result<bool, SecretManagerError> {
460 secrets
461 .get_secret(secret_name)
462 .await
463 .map(|value| value.is_some())
464}
465
466#[tracing::instrument(skip(secrets, role_password, rollback))]
468async fn initialize_tenant_db_secret(
469 secrets: &SecretManager,
470 secret_name: &str,
471 role_name: &str,
472 role_password: &str,
473 rollback: &mut CreateTenantRollbackData,
474) -> Result<(), CreateTenantError> {
475 if secrets
477 .has_secret(secret_name)
478 .await
479 .map_err(CreateTenantError::CreateTenantSecret)?
480 {
481 return Err(CreateTenantError::SecretAlreadyExists);
482 }
483
484 let secret_value = serde_json::to_string(&json!({
485 "username": role_name,
486 "password": role_password
487 }))
488 .map_err(CreateTenantError::SerializeSecret)?;
489
490 secrets
491 .set_secret(secret_name, &secret_value)
492 .await
493 .map_err(CreateTenantError::CreateTenantSecret)?;
494
495 rollback.secret = Some((secrets.clone(), secret_name.to_string()));
496
497 Ok(())
498}
499
500#[tracing::instrument(skip(storage, rollback))]
502async fn create_tenant_storage(
503 tenant: &Tenant,
504 storage: &StorageLayerFactory,
505 s3_queue_arn: Option<String>,
506 origins: Vec<String>,
507 rollback: &mut CreateTenantRollbackData,
508) -> Result<(), CreateTenantError> {
509 let storage = storage.create_storage_layer(tenant);
510 let outcome = storage
511 .create_bucket()
512 .await
513 .inspect_err(|error| tracing::error!(?error, "failed to create tenant bucket"))
514 .map_err(CreateTenantError::CreateStorageBucket)?;
515
516 if matches!(outcome, CreateBucketOutcome::New) {
518 rollback.storage = Some(storage.clone());
519 }
520
521 if let Some(s3_queue_arn) = s3_queue_arn {
523 storage
524 .add_bucket_notifications(&s3_queue_arn)
525 .await
526 .inspect_err(|error| {
527 tracing::error!(?error, "failed to add bucket notification configuration")
528 })
529 .map_err(CreateTenantError::SetupS3Notifications)?;
530 }
531
532 if !origins.is_empty() {
534 storage
535 .set_bucket_cors_origins(origins)
536 .await
537 .inspect_err(|error| tracing::error!(?error, "failed to add bucket cors rules"))
538 .map_err(CreateTenantError::SetupStorageOrigins)?;
539 }
540
541 Ok(())
542}
543
544#[tracing::instrument(skip(search, rollback))]
546async fn create_tenant_search(
547 tenant: &Tenant,
548 search: &SearchIndexFactory,
549 rollback: &mut CreateTenantRollbackData,
550) -> Result<TenantSearchIndex, CreateTenantError> {
551 let search = search.create_search_index(tenant);
553 search
554 .create_index()
555 .await
556 .map_err(CreateTenantError::CreateSearchIndex)
557 .inspect_err(|error| tracing::error!(?error, "failed to create search index"))?;
558
559 rollback.search_index = Some(search.clone());
561
562 Ok(search)
563}