Skip to main content

modo_db/
sync.rs

1use crate::entity::EntityRegistration;
2use crate::migration::MigrationRegistration;
3use crate::pool::DbPool;
4use sea_orm::{ConnectionTrait, Schema};
5use tracing::info;
6
7/// Synchronize database schema from all registered entities, then run all pending migrations.
8///
9/// 1. Bootstrap `_modo_migrations` table (must exist before schema sync)
10/// 2. Collect all `EntityRegistration` entries from `inventory`
11/// 3. Register framework entities first, then user entities
12/// 4. Run `SchemaBuilder::sync()` (addition-only, topo-sorted by SeaORM)
13/// 5. Execute extra SQL (composite indices, partial unique indices)
14/// 6. Run pending migrations (version-ordered, tracked in `_modo_migrations`)
15pub async fn sync_and_migrate(db: &DbPool) -> Result<(), modo::Error> {
16    do_sync(db, None).await
17}
18
19/// Synchronize database schema for entities and migrations belonging to the named group only.
20///
21/// This is useful when entities in a specific group live in a separate database (e.g.
22/// SQLite jobs database). The `_modo_migrations` table is always bootstrapped in the
23/// target database.
24pub async fn sync_and_migrate_group(db: &DbPool, group: &str) -> Result<(), modo::Error> {
25    do_sync(db, Some(group)).await
26}
27
28async fn do_sync(db: &DbPool, group_filter: Option<&str>) -> Result<(), modo::Error> {
29    let conn = db.connection();
30
31    // 1. Bootstrap _modo_migrations (BIGINT + CURRENT_TIMESTAMP work on both SQLite and Postgres)
32    conn.execute_unprepared(
33        "CREATE TABLE IF NOT EXISTS _modo_migrations (\
34            version BIGINT PRIMARY KEY, \
35            description TEXT NOT NULL, \
36            executed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP\
37        )",
38    )
39    .await
40    .map_err(|e| modo::Error::internal(format!("Failed to bootstrap migrations table: {e}")))?;
41
42    // 2. Collect entities, optionally filtered by group
43    let (framework, user): (Vec<_>, Vec<_>) = inventory::iter::<EntityRegistration>
44        .into_iter()
45        .filter(|r| match group_filter {
46            Some(g) => r.group == g,
47            None => true,
48        })
49        .partition(|r| r.is_framework);
50
51    let backend = conn.get_database_backend();
52    let schema = Schema::new(backend);
53    let mut builder = schema.builder();
54
55    // Register framework entities first, then user entities
56    for reg in &framework {
57        builder = (reg.register_fn)(builder);
58    }
59    for reg in &user {
60        builder = (reg.register_fn)(builder);
61    }
62
63    // 3. Sync (addition-only — SeaORM handles topo sort)
64    builder
65        .sync(conn)
66        .await
67        .map_err(|e| modo::Error::internal(format!("Schema sync failed: {e}")))?;
68    info!("Schema sync complete");
69
70    // 4. Run extra SQL (composite indices, partial unique indices, etc.)
71    for reg in framework.iter().chain(user.iter()) {
72        for sql in reg.extra_sql {
73            if let Err(e) = conn.execute_unprepared(sql).await {
74                tracing::error!(
75                    table = reg.table_name,
76                    sql = sql,
77                    error = %e,
78                    "Failed to execute extra SQL for entity"
79                );
80                return Err(modo::Error::internal(format!(
81                    "Extra SQL for {} failed: {e}",
82                    reg.table_name
83                )));
84            }
85        }
86    }
87
88    // 5. Run pending migrations
89    run_pending_migrations(conn, group_filter).await?;
90
91    Ok(())
92}
93
94async fn run_pending_migrations(
95    db: &sea_orm::DatabaseConnection,
96    group_filter: Option<&str>,
97) -> Result<(), modo::Error> {
98    use crate::migration::migration_entity;
99    use sea_orm::EntityTrait;
100    use std::collections::HashSet;
101
102    let mut migrations: Vec<&MigrationRegistration> = inventory::iter::<MigrationRegistration>
103        .into_iter()
104        .filter(|m| match group_filter {
105            Some(g) => m.group == g,
106            None => true,
107        })
108        .collect();
109
110    if migrations.is_empty() {
111        return Ok(());
112    }
113
114    // Check for duplicate versions
115    let mut seen = HashSet::new();
116    for m in &migrations {
117        if !seen.insert(m.version) {
118            return Err(modo::Error::internal(format!(
119                "Duplicate migration version: {}",
120                m.version
121            )));
122        }
123    }
124
125    migrations.sort_by_key(|m| m.version);
126
127    // Query already-executed versions
128    let executed: Vec<migration_entity::Model> = migration_entity::Entity::find()
129        .all(db)
130        .await
131        .map_err(|e| modo::Error::internal(format!("Failed to query migrations: {e}")))?;
132    let executed_versions: HashSet<u64> = executed.iter().map(|m| m.version as u64).collect();
133
134    // Run pending
135    for migration in &migrations {
136        if executed_versions.contains(&migration.version) {
137            continue;
138        }
139        info!(
140            "Running migration v{}: {}",
141            migration.version, migration.description
142        );
143
144        (migration.handler)(db).await?;
145
146        // Record migration as executed
147        let version_i64 = i64::try_from(migration.version).map_err(|_| {
148            modo::Error::internal(format!(
149                "Migration version {} exceeds maximum ({})",
150                migration.version,
151                i64::MAX
152            ))
153        })?;
154        let record = migration_entity::ActiveModel {
155            version: sea_orm::Set(version_i64),
156            description: sea_orm::Set(migration.description.to_string()),
157            executed_at: sea_orm::Set(chrono::Utc::now().to_rfc3339()),
158        };
159        migration_entity::Entity::insert(record)
160            .exec(db)
161            .await
162            .map_err(|e| modo::Error::internal(format!("Failed to record migration: {e}")))?;
163        info!("Migration v{} complete", migration.version);
164    }
165
166    Ok(())
167}