1use crate::entity::EntityRegistration;
2use crate::migration::MigrationRegistration;
3use crate::pool::DbPool;
4use sea_orm::{ConnectionTrait, Schema};
5use tracing::info;
6
7pub async fn sync_and_migrate(db: &DbPool) -> Result<(), modo::Error> {
16 do_sync(db, None).await
17}
18
19pub async fn sync_and_migrate_group(db: &DbPool, group: &str) -> Result<(), modo::Error> {
25 do_sync(db, Some(group)).await
26}
27
28async fn do_sync(db: &DbPool, group_filter: Option<&str>) -> Result<(), modo::Error> {
29 let conn = db.connection();
30
31 conn.execute_unprepared(
33 "CREATE TABLE IF NOT EXISTS _modo_migrations (\
34 version BIGINT PRIMARY KEY, \
35 description TEXT NOT NULL, \
36 executed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP\
37 )",
38 )
39 .await
40 .map_err(|e| modo::Error::internal(format!("Failed to bootstrap migrations table: {e}")))?;
41
42 let (framework, user): (Vec<_>, Vec<_>) = inventory::iter::<EntityRegistration>
44 .into_iter()
45 .filter(|r| match group_filter {
46 Some(g) => r.group == g,
47 None => true,
48 })
49 .partition(|r| r.is_framework);
50
51 let backend = conn.get_database_backend();
52 let schema = Schema::new(backend);
53 let mut builder = schema.builder();
54
55 for reg in &framework {
57 builder = (reg.register_fn)(builder);
58 }
59 for reg in &user {
60 builder = (reg.register_fn)(builder);
61 }
62
63 builder
65 .sync(conn)
66 .await
67 .map_err(|e| modo::Error::internal(format!("Schema sync failed: {e}")))?;
68 info!("Schema sync complete");
69
70 for reg in framework.iter().chain(user.iter()) {
72 for sql in reg.extra_sql {
73 if let Err(e) = conn.execute_unprepared(sql).await {
74 tracing::error!(
75 table = reg.table_name,
76 sql = sql,
77 error = %e,
78 "Failed to execute extra SQL for entity"
79 );
80 return Err(modo::Error::internal(format!(
81 "Extra SQL for {} failed: {e}",
82 reg.table_name
83 )));
84 }
85 }
86 }
87
88 run_pending_migrations(conn, group_filter).await?;
90
91 Ok(())
92}
93
94async fn run_pending_migrations(
95 db: &sea_orm::DatabaseConnection,
96 group_filter: Option<&str>,
97) -> Result<(), modo::Error> {
98 use crate::migration::migration_entity;
99 use sea_orm::EntityTrait;
100 use std::collections::HashSet;
101
102 let mut migrations: Vec<&MigrationRegistration> = inventory::iter::<MigrationRegistration>
103 .into_iter()
104 .filter(|m| match group_filter {
105 Some(g) => m.group == g,
106 None => true,
107 })
108 .collect();
109
110 if migrations.is_empty() {
111 return Ok(());
112 }
113
114 let mut seen = HashSet::new();
116 for m in &migrations {
117 if !seen.insert(m.version) {
118 return Err(modo::Error::internal(format!(
119 "Duplicate migration version: {}",
120 m.version
121 )));
122 }
123 }
124
125 migrations.sort_by_key(|m| m.version);
126
127 let executed: Vec<migration_entity::Model> = migration_entity::Entity::find()
129 .all(db)
130 .await
131 .map_err(|e| modo::Error::internal(format!("Failed to query migrations: {e}")))?;
132 let executed_versions: HashSet<u64> = executed.iter().map(|m| m.version as u64).collect();
133
134 for migration in &migrations {
136 if executed_versions.contains(&migration.version) {
137 continue;
138 }
139 info!(
140 "Running migration v{}: {}",
141 migration.version, migration.description
142 );
143
144 (migration.handler)(db).await?;
145
146 let version_i64 = i64::try_from(migration.version).map_err(|_| {
148 modo::Error::internal(format!(
149 "Migration version {} exceeds maximum ({})",
150 migration.version,
151 i64::MAX
152 ))
153 })?;
154 let record = migration_entity::ActiveModel {
155 version: sea_orm::Set(version_i64),
156 description: sea_orm::Set(migration.description.to_string()),
157 executed_at: sea_orm::Set(chrono::Utc::now().to_rfc3339()),
158 };
159 migration_entity::Entity::insert(record)
160 .exec(db)
161 .await
162 .map_err(|e| modo::Error::internal(format!("Failed to record migration: {e}")))?;
163 info!("Migration v{} complete", migration.version);
164 }
165
166 Ok(())
167}