1use crate::entity::EntityRegistration;
2use crate::migration::MigrationRegistration;
3use crate::pool::DbPool;
4use sea_orm::{ConnectionTrait, Schema};
5use tracing::info;
6
7pub async fn sync_and_migrate(db: &DbPool) -> Result<(), modo::Error> {
15 do_sync(db, None).await
16}
17
18pub async fn sync_and_migrate_group(db: &DbPool, group: &str) -> Result<(), modo::Error> {
24 do_sync(db, Some(group)).await
25}
26
27async fn do_sync(db: &DbPool, group_filter: Option<&str>) -> Result<(), modo::Error> {
28 let conn = db.connection();
29
30 conn.execute_unprepared(
32 "CREATE TABLE IF NOT EXISTS _modo_migrations (\
33 version BIGINT PRIMARY KEY, \
34 description TEXT NOT NULL, \
35 executed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP\
36 )",
37 )
38 .await
39 .map_err(|e| modo::Error::internal(format!("failed to bootstrap migrations table: {e}")))?;
40
41 let (framework, user): (Vec<_>, Vec<_>) = inventory::iter::<EntityRegistration>
43 .into_iter()
44 .filter(|r| match group_filter {
45 Some(g) => r.group == g,
46 None => true,
47 })
48 .partition(|r| r.is_framework);
49
50 let backend = conn.get_database_backend();
51 let schema = Schema::new(backend);
52 let mut builder = schema.builder();
53
54 for reg in &framework {
56 builder = (reg.register_fn)(builder);
57 }
58 for reg in &user {
59 builder = (reg.register_fn)(builder);
60 }
61
62 builder
64 .sync(conn)
65 .await
66 .map_err(|e| modo::Error::internal(format!("schema sync failed: {e}")))?;
67 info!("Schema sync complete");
68
69 for reg in framework.iter().chain(user.iter()) {
71 for sql in reg.extra_sql {
72 if let Err(e) = conn.execute_unprepared(sql).await {
73 tracing::error!(
74 table = reg.table_name,
75 sql = sql,
76 error = %e,
77 "Failed to execute extra SQL for entity"
78 );
79 return Err(modo::Error::internal(format!(
80 "extra SQL for {} failed: {e}",
81 reg.table_name
82 )));
83 }
84 }
85 }
86
87 run_pending_migrations(conn, group_filter).await?;
89
90 Ok(())
91}
92
93async fn run_pending_migrations(
94 db: &sea_orm::DatabaseConnection,
95 group_filter: Option<&str>,
96) -> Result<(), modo::Error> {
97 use crate::migration::migration_entity;
98 use sea_orm::EntityTrait;
99 use std::collections::HashSet;
100
101 let mut migrations: Vec<&MigrationRegistration> = inventory::iter::<MigrationRegistration>
102 .into_iter()
103 .filter(|m| match group_filter {
104 Some(g) => m.group == g,
105 None => true,
106 })
107 .collect();
108
109 if migrations.is_empty() {
110 return Ok(());
111 }
112
113 let mut seen = HashSet::new();
115 for m in &migrations {
116 if !seen.insert(m.version) {
117 return Err(modo::Error::internal(format!(
118 "duplicate migration version: {}",
119 m.version
120 )));
121 }
122 }
123
124 migrations.sort_by_key(|m| m.version);
125
126 let executed: Vec<migration_entity::Model> = migration_entity::Entity::find()
128 .all(db)
129 .await
130 .map_err(|e| modo::Error::internal(format!("failed to query migrations: {e}")))?;
131 let executed_versions: HashSet<u64> = executed.iter().map(|m| m.version as u64).collect();
132
133 for migration in &migrations {
135 if executed_versions.contains(&migration.version) {
136 continue;
137 }
138 info!(
139 "Running migration v{}: {}",
140 migration.version, migration.description
141 );
142
143 (migration.handler)(db).await?;
144
145 let version_i64 = i64::try_from(migration.version).map_err(|_| {
147 modo::Error::internal(format!(
148 "migration version {} exceeds maximum ({})",
149 migration.version,
150 i64::MAX
151 ))
152 })?;
153 let record = migration_entity::ActiveModel {
154 version: sea_orm::Set(version_i64),
155 description: sea_orm::Set(migration.description.to_string()),
156 executed_at: sea_orm::Set(chrono::Utc::now().to_rfc3339()),
157 };
158 migration_entity::Entity::insert(record)
159 .exec(db)
160 .await
161 .map_err(|e| modo::Error::internal(format!("failed to record migration: {e}")))?;
162 info!("Migration v{} complete", migration.version);
163 }
164
165 Ok(())
166}