docbox_database/migrations/
mod.rs1use crate::{
2 DbExecutor, DbResult, DbTransaction,
3 models::{
4 root_migration::{CreateRootMigration, RootMigration},
5 tenant::Tenant,
6 tenant_migration::{CreateTenantMigration, TenantMigration},
7 },
8};
9use chrono::Utc;
10use std::ops::DerefMut;
11
12pub const ROOT_MIGRATIONS: &[(&str, &str)] = &[
13 (
14 "m1_create_tenants_table",
15 include_str!("./root/m1_create_tenants_table.sql"),
16 ),
17 (
18 "m2_create_tenant_migrations_table",
19 include_str!("./root/m2_create_tenant_migrations_table.sql"),
20 ),
21 (
22 "m3_create_storage_bucket_index",
23 include_str!("./root/m3_create_storage_bucket_index.sql"),
24 ),
25];
26
27pub const TENANT_MIGRATIONS: &[(&str, &str)] = &[
28 (
29 "m1_create_users_table",
30 include_str!("./tenant/m1_create_users_table.sql"),
31 ),
32 (
33 "m2_create_document_box_table",
34 include_str!("./tenant/m2_create_document_box_table.sql"),
35 ),
36 (
37 "m3_create_folders_table",
38 include_str!("./tenant/m3_create_folders_table.sql"),
39 ),
40 (
41 "m4_create_files_table",
42 include_str!("./tenant/m4_create_files_table.sql"),
43 ),
44 (
45 "m5_create_generated_files_table",
46 include_str!("./tenant/m5_create_generated_files_table.sql"),
47 ),
48 (
49 "m6_create_links_table",
50 include_str!("./tenant/m6_create_links_table.sql"),
51 ),
52 (
53 "m7_create_edit_history_table",
54 include_str!("./tenant/m7_create_edit_history_table.sql"),
55 ),
56 (
57 "m8_create_tasks_table",
58 include_str!("./tenant/m8_create_tasks_table.sql"),
59 ),
60 (
61 "m9_create_presigned_upload_tasks_table",
62 include_str!("./tenant/m9_create_presigned_upload_tasks_table.sql"),
63 ),
64 (
65 "m10_add_pinned_column",
66 include_str!("./tenant/m10_add_pinned_column.sql"),
67 ),
68];
69
70pub async fn initialize_root_migrations(db: impl DbExecutor<'_>) -> DbResult<()> {
74 sqlx::raw_sql(include_str!("./root/m0_create_root_migrations_table.sql"))
75 .execute(db)
76 .await?;
77
78 Ok(())
79}
80
81pub async fn get_pending_tenant_migrations(
83 db: impl DbExecutor<'_>,
84 tenant: &Tenant,
85) -> DbResult<Vec<String>> {
86 let migrations = TenantMigration::find_by_tenant(db, tenant.id, &tenant.env).await?;
87
88 let pending = TENANT_MIGRATIONS
89 .iter()
90 .filter(|(migration_name, _migration)| {
91 !migrations
93 .iter()
94 .any(|migration| migration.name.eq(migration_name))
95 })
96 .map(|(migration_name, _migration)| migration_name.to_string())
97 .collect();
98
99 Ok(pending)
100}
101
102pub async fn get_pending_root_migrations(db: impl DbExecutor<'_>) -> DbResult<Vec<String>> {
104 let migrations = RootMigration::all(db).await?;
105
106 let pending = ROOT_MIGRATIONS
107 .iter()
108 .filter(|(migration_name, _migration)| {
109 !migrations
111 .iter()
112 .any(|migration| migration.name.eq(migration_name))
113 })
114 .map(|(migration_name, _migration)| migration_name.to_string())
115 .collect();
116
117 Ok(pending)
118}
119
120pub async fn apply_tenant_migrations(
125 root_t: &mut DbTransaction<'_>,
126 t: &mut DbTransaction<'_>,
127 tenant: &Tenant,
128 target_migration_name: Option<&str>,
129) -> DbResult<()> {
130 let migrations =
131 TenantMigration::find_by_tenant(root_t.deref_mut(), tenant.id, &tenant.env).await?;
132
133 for (migration_name, migration) in TENANT_MIGRATIONS {
134 if target_migration_name
136 .is_some_and(|target_migration_name| target_migration_name.ne(*migration_name))
137 {
138 continue;
139 }
140
141 if migrations
143 .iter()
144 .any(|migration| migration.name.eq(migration_name))
145 {
146 continue;
147 }
148
149 apply_migration(t, migration_name, migration).await?;
151
152 TenantMigration::create(
154 root_t.deref_mut(),
155 CreateTenantMigration {
156 tenant_id: tenant.id,
157 env: tenant.env.clone(),
158 name: migration_name.to_string(),
159 applied_at: Utc::now(),
160 },
161 )
162 .await?;
163 }
164
165 Ok(())
166}
167
168pub async fn apply_root_migrations(
173 root_t: &mut DbTransaction<'_>,
174 target_migration_name: Option<&str>,
175) -> DbResult<()> {
176 let migrations = RootMigration::all(root_t.deref_mut()).await?;
177
178 for (migration_name, migration) in ROOT_MIGRATIONS {
179 if target_migration_name
181 .is_some_and(|target_migration_name| target_migration_name.ne(*migration_name))
182 {
183 continue;
184 }
185
186 if migrations
188 .iter()
189 .any(|migration| migration.name.eq(migration_name))
190 {
191 continue;
192 }
193
194 apply_migration(root_t, migration_name, migration).await?;
196
197 RootMigration::create(
199 root_t.deref_mut(),
200 CreateRootMigration {
201 name: migration_name.to_string(),
202 applied_at: Utc::now(),
203 },
204 )
205 .await?;
206 }
207
208 Ok(())
209}
210
211pub async fn force_apply_tenant_migrations(
215 t: &mut DbTransaction<'_>,
216 target_migration_name: Option<&str>,
217) -> DbResult<()> {
218 for (migration_name, migration) in TENANT_MIGRATIONS {
219 if target_migration_name
221 .is_some_and(|target_migration_name| target_migration_name.ne(*migration_name))
222 {
223 continue;
224 }
225
226 apply_migration(t, migration_name, migration).await?;
227 }
228
229 Ok(())
230}
231
232pub async fn apply_migration(
234 db: &mut DbTransaction<'_>,
235 migration_name: &str,
236 migration: &str,
237) -> DbResult<()> {
238 let queries = migration
240 .split(';')
241 .map(|query| query.trim())
242 .filter(|query| !query.is_empty());
243
244 for query in queries {
245 let result = sqlx::query(query)
246 .execute(db.deref_mut())
247 .await
248 .inspect_err(|error| {
249 tracing::error!(?error, ?migration_name, "failed to perform migration")
250 })?;
251 let rows_affected = result.rows_affected();
252
253 tracing::debug!(?migration_name, ?rows_affected, "applied migration query");
254 }
255
256 Ok(())
257}