Skip to main content

modo_db/
sync.rs

1use crate::entity::EntityRegistration;
2use crate::migration::MigrationRegistration;
3use crate::pool::DbPool;
4use sea_orm::{ConnectionTrait, Schema};
5use tracing::info;
6
7/// Synchronize database schema from all registered entities, then run all pending migrations.
8///
9/// 1. Bootstrap `_modo_migrations` table (must exist before schema sync).
10/// 2. Collect all `EntityRegistration` entries from `inventory` (framework entities first,
11///    then user entities), and run `SchemaBuilder::sync()` (addition-only).
12/// 3. Execute extra SQL registered with each entity (composite indices, partial unique indices).
13/// 4. Run pending migrations in ascending `version` order, tracked in `_modo_migrations`.
14pub async fn sync_and_migrate(db: &DbPool) -> Result<(), modo::Error> {
15    do_sync(db, None).await
16}
17
18/// Synchronize database schema for entities and migrations belonging to the named group only.
19///
20/// This is useful when entities in a specific group live in a separate database (e.g.
21/// SQLite jobs database). The `_modo_migrations` table is always bootstrapped in the
22/// target database.
23pub async fn sync_and_migrate_group(db: &DbPool, group: &str) -> Result<(), modo::Error> {
24    do_sync(db, Some(group)).await
25}
26
27async fn do_sync(db: &DbPool, group_filter: Option<&str>) -> Result<(), modo::Error> {
28    let conn = db.connection();
29
30    // 1. Bootstrap _modo_migrations (BIGINT + CURRENT_TIMESTAMP work on both SQLite and Postgres)
31    conn.execute_unprepared(
32        "CREATE TABLE IF NOT EXISTS _modo_migrations (\
33            version BIGINT PRIMARY KEY, \
34            description TEXT NOT NULL, \
35            executed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP\
36        )",
37    )
38    .await
39    .map_err(|e| modo::Error::internal(format!("failed to bootstrap migrations table: {e}")))?;
40
41    // 2. Collect entities, optionally filtered by group
42    let (framework, user): (Vec<_>, Vec<_>) = inventory::iter::<EntityRegistration>
43        .into_iter()
44        .filter(|r| match group_filter {
45            Some(g) => r.group == g,
46            None => true,
47        })
48        .partition(|r| r.is_framework);
49
50    let backend = conn.get_database_backend();
51    let schema = Schema::new(backend);
52    let mut builder = schema.builder();
53
54    // Register framework entities first, then user entities
55    for reg in &framework {
56        builder = (reg.register_fn)(builder);
57    }
58    for reg in &user {
59        builder = (reg.register_fn)(builder);
60    }
61
62    // 3. Sync (addition-only — SeaORM handles topo sort)
63    builder
64        .sync(conn)
65        .await
66        .map_err(|e| modo::Error::internal(format!("schema sync failed: {e}")))?;
67    info!("Schema sync complete");
68
69    // 4. Run extra SQL (composite indices, partial unique indices, etc.)
70    for reg in framework.iter().chain(user.iter()) {
71        for sql in reg.extra_sql {
72            if let Err(e) = conn.execute_unprepared(sql).await {
73                tracing::error!(
74                    table = reg.table_name,
75                    sql = sql,
76                    error = %e,
77                    "Failed to execute extra SQL for entity"
78                );
79                return Err(modo::Error::internal(format!(
80                    "extra SQL for {} failed: {e}",
81                    reg.table_name
82                )));
83            }
84        }
85    }
86
87    // 5. Run pending migrations
88    run_pending_migrations(conn, group_filter).await?;
89
90    Ok(())
91}
92
93async fn run_pending_migrations(
94    db: &sea_orm::DatabaseConnection,
95    group_filter: Option<&str>,
96) -> Result<(), modo::Error> {
97    use crate::migration::migration_entity;
98    use sea_orm::EntityTrait;
99    use std::collections::HashSet;
100
101    let mut migrations: Vec<&MigrationRegistration> = inventory::iter::<MigrationRegistration>
102        .into_iter()
103        .filter(|m| match group_filter {
104            Some(g) => m.group == g,
105            None => true,
106        })
107        .collect();
108
109    if migrations.is_empty() {
110        return Ok(());
111    }
112
113    // Check for duplicate versions
114    let mut seen = HashSet::new();
115    for m in &migrations {
116        if !seen.insert(m.version) {
117            return Err(modo::Error::internal(format!(
118                "duplicate migration version: {}",
119                m.version
120            )));
121        }
122    }
123
124    migrations.sort_by_key(|m| m.version);
125
126    // Query already-executed versions
127    let executed: Vec<migration_entity::Model> = migration_entity::Entity::find()
128        .all(db)
129        .await
130        .map_err(|e| modo::Error::internal(format!("failed to query migrations: {e}")))?;
131    let executed_versions: HashSet<u64> = executed.iter().map(|m| m.version as u64).collect();
132
133    // Run pending
134    for migration in &migrations {
135        if executed_versions.contains(&migration.version) {
136            continue;
137        }
138        info!(
139            "Running migration v{}: {}",
140            migration.version, migration.description
141        );
142
143        (migration.handler)(db).await?;
144
145        // Record migration as executed
146        let version_i64 = i64::try_from(migration.version).map_err(|_| {
147            modo::Error::internal(format!(
148                "migration version {} exceeds maximum ({})",
149                migration.version,
150                i64::MAX
151            ))
152        })?;
153        let record = migration_entity::ActiveModel {
154            version: sea_orm::Set(version_i64),
155            description: sea_orm::Set(migration.description.to_string()),
156            executed_at: sea_orm::Set(chrono::Utc::now().to_rfc3339()),
157        };
158        migration_entity::Entity::insert(record)
159            .exec(db)
160            .await
161            .map_err(|e| modo::Error::internal(format!("failed to record migration: {e}")))?;
162        info!("Migration v{} complete", migration.version);
163    }
164
165    Ok(())
166}