1use crate::entity::EntityRegistration;
2use crate::migration::MigrationRegistration;
3use crate::pool::DbPool;
4use sea_orm::{ConnectionTrait, Schema};
5use tracing::info;
6
7pub async fn sync_and_migrate(db: &DbPool) -> Result<(), modo::Error> {
16 let conn = db.connection();
17
18 conn.execute_unprepared(
20 "CREATE TABLE IF NOT EXISTS _modo_migrations (\
21 version BIGINT PRIMARY KEY, \
22 description TEXT NOT NULL, \
23 executed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP\
24 )",
25 )
26 .await
27 .map_err(|e| modo::Error::internal(format!("Failed to bootstrap migrations table: {e}")))?;
28
29 let (framework, user): (Vec<_>, Vec<_>) = inventory::iter::<EntityRegistration>
31 .into_iter()
32 .partition(|r| r.is_framework);
33
34 let backend = conn.get_database_backend();
35 let schema = Schema::new(backend);
36 let mut builder = schema.builder();
37
38 for reg in &framework {
40 builder = (reg.register_fn)(builder);
41 }
42 for reg in &user {
43 builder = (reg.register_fn)(builder);
44 }
45
46 builder
48 .sync(conn)
49 .await
50 .map_err(|e| modo::Error::internal(format!("Schema sync failed: {e}")))?;
51 info!("Schema sync complete");
52
53 for reg in framework.iter().chain(user.iter()) {
55 for sql in reg.extra_sql {
56 if let Err(e) = conn.execute_unprepared(sql).await {
57 tracing::error!(
58 table = reg.table_name,
59 sql = sql,
60 error = %e,
61 "Failed to execute extra SQL for entity"
62 );
63 return Err(modo::Error::internal(format!(
64 "Extra SQL for {} failed: {e}",
65 reg.table_name
66 )));
67 }
68 }
69 }
70
71 run_pending_migrations(conn).await?;
73
74 Ok(())
75}
76
77async fn run_pending_migrations(db: &sea_orm::DatabaseConnection) -> Result<(), modo::Error> {
78 use crate::migration::migration_entity;
79 use sea_orm::EntityTrait;
80 use std::collections::HashSet;
81
82 let mut migrations: Vec<&MigrationRegistration> = inventory::iter::<MigrationRegistration>
83 .into_iter()
84 .collect();
85
86 if migrations.is_empty() {
87 return Ok(());
88 }
89
90 let mut seen = HashSet::new();
92 for m in &migrations {
93 if !seen.insert(m.version) {
94 return Err(modo::Error::internal(format!(
95 "Duplicate migration version: {}",
96 m.version
97 )));
98 }
99 }
100
101 migrations.sort_by_key(|m| m.version);
102
103 let executed: Vec<migration_entity::Model> = migration_entity::Entity::find()
105 .all(db)
106 .await
107 .map_err(|e| modo::Error::internal(format!("Failed to query migrations: {e}")))?;
108 let executed_versions: HashSet<u64> = executed.iter().map(|m| m.version as u64).collect();
109
110 for migration in &migrations {
112 if executed_versions.contains(&migration.version) {
113 continue;
114 }
115 info!(
116 "Running migration v{}: {}",
117 migration.version, migration.description
118 );
119
120 (migration.handler)(db).await?;
121
122 let version_i64 = i64::try_from(migration.version).map_err(|_| {
124 modo::Error::internal(format!(
125 "Migration version {} exceeds maximum ({})",
126 migration.version,
127 i64::MAX
128 ))
129 })?;
130 let record = migration_entity::ActiveModel {
131 version: sea_orm::Set(version_i64),
132 description: sea_orm::Set(migration.description.to_string()),
133 executed_at: sea_orm::Set(chrono::Utc::now().to_rfc3339()),
134 };
135 migration_entity::Entity::insert(record)
136 .exec(db)
137 .await
138 .map_err(|e| modo::Error::internal(format!("Failed to record migration: {e}")))?;
139 info!("Migration v{} complete", migration.version);
140 }
141
142 Ok(())
143}