docbox_core/tenant/
create_tenant.rs1use crate::utils::rollback::RollbackGuard;
2use docbox_database::migrations::apply_tenant_migrations;
3use docbox_database::models::tenant::TenantId;
4use docbox_database::{DbConnectErr, DbPool};
5use docbox_database::{DbErr, models::tenant::Tenant};
6use docbox_search::{SearchIndexFactory, TenantSearchIndex};
7use docbox_storage::StorageLayerFactory;
8use std::ops::DerefMut;
9use thiserror::Error;
10
11pub struct CreateTenant {
13 pub env: String,
15
16 pub name: String,
18
19 pub id: TenantId,
21
22 pub db_name: String,
24
25 pub db_secret_name: String,
27
28 pub s3_name: String,
30
31 pub os_index_name: String,
33
34 pub event_queue_url: Option<String>,
36
37 pub origins: Vec<String>,
39
40 pub s3_queue_arn: Option<String>,
43}
44
45#[derive(Debug, Error)]
46pub enum InitTenantError {
47 #[error("failed to connect")]
49 ConnectDb(#[from] DbConnectErr),
50
51 #[error(transparent)]
53 Database(#[from] DbErr),
54
55 #[error("tenant already exists")]
57 TenantAlreadyExist,
58
59 #[error("failed to create tenant s3 bucket: {0}")]
61 CreateS3Bucket(anyhow::Error),
62
63 #[error("failed to setup s3 notification rules: {0}")]
65 SetupS3Notifications(anyhow::Error),
66
67 #[error("failed to setup s3 CORS rules: {0}")]
69 SetupS3Cors(anyhow::Error),
70
71 #[error("failed to create tenant search index: {0}")]
73 CreateSearchIndex(anyhow::Error),
74
75 #[error("failed to migrate tenant search index: {0}")]
77 MigrateSearchIndex(anyhow::Error),
78}
79
80pub async fn create_tenant(
82 root_db: &DbPool,
83 tenant_db: &DbPool,
84
85 search: &SearchIndexFactory,
86 storage: &StorageLayerFactory,
87 create: CreateTenant,
88) -> Result<Tenant, InitTenantError> {
89 let mut root_transaction = root_db
91 .begin()
92 .await
93 .inspect_err(|error| tracing::error!(?error, "failed to begin root transaction"))?;
94
95 let tenant: Tenant = Tenant::create(
97 root_transaction.deref_mut(),
98 docbox_database::models::tenant::CreateTenant {
99 id: create.id,
100 name: create.name,
101 db_name: create.db_name,
102 db_secret_name: create.db_secret_name,
103 s3_name: create.s3_name,
104 os_index_name: create.os_index_name,
105 event_queue_url: create.event_queue_url,
106 env: create.env,
107 },
108 )
109 .await
110 .map_err(|err| {
111 if let Some(db_err) = err.as_database_error() {
112 if db_err.is_unique_violation() {
114 return InitTenantError::TenantAlreadyExist;
115 }
116 }
117
118 InitTenantError::Database(err)
119 })
120 .inspect_err(|error| tracing::error!(?error, "failed to create tenant"))?;
121
122 let mut tenant_transaction = tenant_db
124 .begin()
125 .await
126 .inspect_err(|error| tracing::error!(?error, "failed to begin tenant transaction"))?;
127
128 apply_tenant_migrations(
130 &mut root_transaction,
131 &mut tenant_transaction,
132 &tenant,
133 None,
134 )
135 .await
136 .inspect_err(|error| tracing::error!(?error, "failed to create tenant tables"))?;
137
138 tracing::debug!("creating tenant storage");
140 let storage =
141 create_tenant_storage(&tenant, storage, create.s3_queue_arn, create.origins).await?;
142
143 tracing::debug!("creating tenant search index");
145 let (search, search_rollback) = create_tenant_search(&tenant, search).await?;
146
147 search
149 .apply_migrations(
150 &tenant,
151 &mut root_transaction,
152 &mut tenant_transaction,
153 None,
154 )
155 .await
156 .map_err(InitTenantError::MigrateSearchIndex)?;
157
158 tenant_transaction
160 .commit()
161 .await
162 .inspect_err(|error| tracing::error!(?error, "failed to commit tenant transaction"))?;
163 root_transaction
164 .commit()
165 .await
166 .inspect_err(|error| tracing::error!(?error, "failed to commit root transaction"))?;
167
168 storage.commit();
170 search_rollback.commit();
171
172 Ok(tenant)
173}
174
175async fn create_tenant_storage(
177 tenant: &Tenant,
178 storage: &StorageLayerFactory,
179 s3_queue_arn: Option<String>,
180 origins: Vec<String>,
181) -> Result<RollbackGuard<impl FnOnce()>, InitTenantError> {
182 let storage = storage.create_storage_layer(tenant);
183 storage
184 .create_bucket()
185 .await
186 .inspect_err(|error| tracing::error!(?error, "failed to create tenant bucket"))
187 .map_err(InitTenantError::CreateS3Bucket)?;
188
189 let rollback = RollbackGuard::new({
190 let storage = storage.clone();
191 move || {
192 tokio::spawn(async move {
193 if let Err(error) = storage.delete_bucket().await {
194 tracing::error!(?error, "failed to rollback created tenant storage bucket");
195 }
196 });
197 }
198 });
199
200 if let Some(s3_queue_arn) = s3_queue_arn {
202 storage
203 .add_bucket_notifications(&s3_queue_arn)
204 .await
205 .inspect_err(|error| {
206 tracing::error!(?error, "failed to add bucket notification configuration")
207 })
208 .map_err(InitTenantError::SetupS3Notifications)?;
209 }
210
211 if !origins.is_empty() {
213 storage
214 .add_bucket_cors(origins)
215 .await
216 .inspect_err(|error| tracing::error!(?error, "failed to add bucket cors rules"))
217 .map_err(InitTenantError::SetupS3Cors)?;
218 }
219
220 Ok(rollback)
221}
222
223async fn create_tenant_search(
225 tenant: &Tenant,
226 search: &SearchIndexFactory,
227) -> Result<(TenantSearchIndex, RollbackGuard<impl FnOnce()>), InitTenantError> {
228 let search = search.create_search_index(tenant);
230 search
231 .create_index()
232 .await
233 .map_err(InitTenantError::CreateSearchIndex)
234 .inspect_err(|error| tracing::error!(?error, "failed to create search index"))?;
235
236 let rollback = RollbackGuard::new({
237 let search = search.clone();
238 move || {
239 tokio::spawn(async move {
241 if let Err(error) = search.delete_index().await {
242 tracing::error!(?error, "failed to rollback created tenant search index");
243 }
244 });
245 }
246 });
247
248 Ok((search, rollback))
249}