Skip to main content

modo_db/
sync.rs

1use crate::entity::EntityRegistration;
2use crate::migration::MigrationRegistration;
3use crate::pool::DbPool;
4use sea_orm::{ConnectionTrait, Schema};
5use tracing::info;
6
7/// Synchronize database schema from registered entities, then run pending migrations.
8///
9/// 1. Bootstrap `_modo_migrations` table (must exist before schema sync)
10/// 2. Collect all `EntityRegistration` entries from `inventory`
11/// 3. Register framework entities first, then user entities
12/// 4. Run `SchemaBuilder::sync()` (addition-only, topo-sorted by SeaORM)
13/// 5. Execute extra SQL (composite indices, partial unique indices)
14/// 6. Run pending migrations (version-ordered, tracked in `_modo_migrations`)
15pub async fn sync_and_migrate(db: &DbPool) -> Result<(), modo::Error> {
16    let conn = db.connection();
17
18    // 1. Bootstrap _modo_migrations (BIGINT + CURRENT_TIMESTAMP work on both SQLite and Postgres)
19    conn.execute_unprepared(
20        "CREATE TABLE IF NOT EXISTS _modo_migrations (\
21            version BIGINT PRIMARY KEY, \
22            description TEXT NOT NULL, \
23            executed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP\
24        )",
25    )
26    .await
27    .map_err(|e| modo::Error::internal(format!("Failed to bootstrap migrations table: {e}")))?;
28
29    // 2. Collect all entities in a single pass, partitioned by framework vs user
30    let (framework, user): (Vec<_>, Vec<_>) = inventory::iter::<EntityRegistration>
31        .into_iter()
32        .partition(|r| r.is_framework);
33
34    let backend = conn.get_database_backend();
35    let schema = Schema::new(backend);
36    let mut builder = schema.builder();
37
38    // Register framework entities first, then user entities
39    for reg in &framework {
40        builder = (reg.register_fn)(builder);
41    }
42    for reg in &user {
43        builder = (reg.register_fn)(builder);
44    }
45
46    // 3. Sync (addition-only — SeaORM handles topo sort)
47    builder
48        .sync(conn)
49        .await
50        .map_err(|e| modo::Error::internal(format!("Schema sync failed: {e}")))?;
51    info!("Schema sync complete");
52
53    // 4. Run extra SQL (composite indices, partial unique indices, etc.)
54    for reg in framework.iter().chain(user.iter()) {
55        for sql in reg.extra_sql {
56            if let Err(e) = conn.execute_unprepared(sql).await {
57                tracing::error!(
58                    table = reg.table_name,
59                    sql = sql,
60                    error = %e,
61                    "Failed to execute extra SQL for entity"
62                );
63                return Err(modo::Error::internal(format!(
64                    "Extra SQL for {} failed: {e}",
65                    reg.table_name
66                )));
67            }
68        }
69    }
70
71    // 5. Run pending migrations
72    run_pending_migrations(conn).await?;
73
74    Ok(())
75}
76
77async fn run_pending_migrations(db: &sea_orm::DatabaseConnection) -> Result<(), modo::Error> {
78    use crate::migration::migration_entity;
79    use sea_orm::EntityTrait;
80    use std::collections::HashSet;
81
82    let mut migrations: Vec<&MigrationRegistration> = inventory::iter::<MigrationRegistration>
83        .into_iter()
84        .collect();
85
86    if migrations.is_empty() {
87        return Ok(());
88    }
89
90    // Check for duplicate versions
91    let mut seen = HashSet::new();
92    for m in &migrations {
93        if !seen.insert(m.version) {
94            return Err(modo::Error::internal(format!(
95                "Duplicate migration version: {}",
96                m.version
97            )));
98        }
99    }
100
101    migrations.sort_by_key(|m| m.version);
102
103    // Query already-executed versions
104    let executed: Vec<migration_entity::Model> = migration_entity::Entity::find()
105        .all(db)
106        .await
107        .map_err(|e| modo::Error::internal(format!("Failed to query migrations: {e}")))?;
108    let executed_versions: HashSet<u64> = executed.iter().map(|m| m.version as u64).collect();
109
110    // Run pending
111    for migration in &migrations {
112        if executed_versions.contains(&migration.version) {
113            continue;
114        }
115        info!(
116            "Running migration v{}: {}",
117            migration.version, migration.description
118        );
119
120        (migration.handler)(db).await?;
121
122        // Record migration as executed
123        let version_i64 = i64::try_from(migration.version).map_err(|_| {
124            modo::Error::internal(format!(
125                "Migration version {} exceeds maximum ({})",
126                migration.version,
127                i64::MAX
128            ))
129        })?;
130        let record = migration_entity::ActiveModel {
131            version: sea_orm::Set(version_i64),
132            description: sea_orm::Set(migration.description.to_string()),
133            executed_at: sea_orm::Set(chrono::Utc::now().to_rfc3339()),
134        };
135        migration_entity::Entity::insert(record)
136            .exec(db)
137            .await
138            .map_err(|e| modo::Error::internal(format!("Failed to record migration: {e}")))?;
139        info!("Migration v{} complete", migration.version);
140    }
141
142    Ok(())
143}