spacetimedb/db/
update.rs

1use super::relational_db::RelationalDB;
2use crate::database_logger::SystemLogger;
3use crate::sql::parser::RowLevelExpr;
4use spacetimedb_data_structures::map::HashMap;
5use spacetimedb_datastore::locking_tx_datastore::MutTxId;
6use spacetimedb_lib::db::auth::StTableType;
7use spacetimedb_lib::identity::AuthCtx;
8use spacetimedb_lib::AlgebraicValue;
9use spacetimedb_primitives::{ColSet, TableId};
10use spacetimedb_schema::auto_migrate::{AutoMigratePlan, ManualMigratePlan, MigratePlan};
11use spacetimedb_schema::def::TableDef;
12use spacetimedb_schema::schema::{column_schemas_from_defs, IndexSchema, Schema, SequenceSchema, TableSchema};
13use std::sync::Arc;
14
15/// The logger used for by [`update_database`] and friends.
16pub trait UpdateLogger {
17    fn info(&self, msg: &str);
18}
19
20impl UpdateLogger for SystemLogger {
21    fn info(&self, msg: &str) {
22        self.info(msg);
23    }
24}
25
26/// Update the database according to the migration plan.
27///
28/// The update is performed within the transactional context `tx`.
29// NOTE: Manual migration support is predicated on the transactionality of
30// dropping database objects (tables, indexes, etc.).
31// Currently, none of the drop_* methods are transactional.
32// This is safe because the __update__ reducer is no longer supported,
33// and the auto plan guarantees that the migration can't fail.
34// But when implementing manual migrations, we need to make sure that
35// drop_* become transactional.
36pub fn update_database(
37    stdb: &RelationalDB,
38    tx: &mut MutTxId,
39    auth_ctx: AuthCtx,
40    plan: MigratePlan,
41    logger: &dyn UpdateLogger,
42) -> anyhow::Result<()> {
43    let existing_tables = stdb.get_all_tables_mut(tx)?;
44
45    // TODO: consider using `ErrorStream` here.
46    let old_module_def = plan.old_def();
47    for table in existing_tables
48        .iter()
49        .filter(|table| table.table_type != StTableType::System)
50    {
51        let old_def = old_module_def
52            .table(&table.table_name[..])
53            .ok_or_else(|| anyhow::anyhow!("table {} not found in old_module_def", table.table_name))?;
54
55        table.check_compatible(old_module_def, old_def)?;
56    }
57
58    match plan {
59        MigratePlan::Manual(plan) => manual_migrate_database(stdb, tx, plan, logger, existing_tables),
60        MigratePlan::Auto(plan) => auto_migrate_database(stdb, tx, auth_ctx, plan, logger, existing_tables),
61    }
62}
63
64/// Manually migrate a database.
65fn manual_migrate_database(
66    _stdb: &RelationalDB,
67    _tx: &mut MutTxId,
68    _plan: ManualMigratePlan,
69    _logger: &dyn UpdateLogger,
70    _existing_tables: Vec<Arc<TableSchema>>,
71) -> anyhow::Result<()> {
72    unimplemented!("Manual database migrations are not yet implemented")
73}
74
75/// Logs with `info` level to `$logger` as well as via the `log` crate.
76macro_rules! log {
77    ($logger:expr, $($tokens:tt)*) => {
78        $logger.info(&format!($($tokens)*));
79        log::info!($($tokens)*);
80    };
81}
82
83/// Automatically migrate a database.
84fn auto_migrate_database(
85    stdb: &RelationalDB,
86    tx: &mut MutTxId,
87    auth_ctx: AuthCtx,
88    plan: AutoMigratePlan,
89    logger: &dyn UpdateLogger,
90    existing_tables: Vec<Arc<TableSchema>>,
91) -> anyhow::Result<()> {
92    // We have already checked in `migrate_database` that `existing_tables` are compatible with the `old` definition in `plan`.
93    // So we can look up tables in there using unwrap.
94
95    let table_schemas_by_name = existing_tables
96        .into_iter()
97        .map(|table| (table.table_name.clone(), table))
98        .collect::<HashMap<_, _>>();
99
100    log::info!("Running database update prechecks: {}", stdb.database_identity());
101
102    for precheck in plan.prechecks {
103        match precheck {
104            spacetimedb_schema::auto_migrate::AutoMigratePrecheck::CheckAddSequenceRangeValid(sequence_name) => {
105                let table_def = plan.new.stored_in_table_def(sequence_name).unwrap();
106                let sequence_def = &table_def.sequences[sequence_name];
107
108                let table_schema = &table_schemas_by_name[&table_def.name[..]];
109
110                let min: AlgebraicValue = sequence_def.min_value.unwrap_or(1).into();
111                let max: AlgebraicValue = sequence_def.max_value.unwrap_or(i128::MAX).into();
112
113                let range = min..max;
114
115                if stdb
116                    .iter_by_col_range_mut(tx, table_schema.table_id, sequence_def.column, range)?
117                    .next()
118                    .is_some()
119                {
120                    anyhow::bail!(
121                        "Precheck failed: added sequence {} already has values in range",
122                        sequence_name,
123                    );
124                }
125            }
126        }
127    }
128
129    log::info!("Running database update steps: {}", stdb.database_identity());
130
131    for step in plan.steps {
132        match step {
133            spacetimedb_schema::auto_migrate::AutoMigrateStep::AddTable(table_name) => {
134                let table_def: &TableDef = plan.new.expect_lookup(table_name);
135
136                // Recursively sets IDs to 0.
137                // They will be initialized by the database when the table is created.
138                let table_schema = TableSchema::from_module_def(plan.new, table_def, (), TableId::SENTINEL);
139
140                log!(logger, "Creating table `{table_name}`");
141
142                stdb.create_table(tx, table_schema)?;
143            }
144            spacetimedb_schema::auto_migrate::AutoMigrateStep::AddIndex(index_name) => {
145                let table_def = plan.new.stored_in_table_def(index_name).unwrap();
146                let index_def = table_def.indexes.get(index_name).unwrap();
147                let table_id = table_schemas_by_name[&table_def.name[..]].table_id;
148
149                let index_cols = ColSet::from(index_def.algorithm.columns());
150
151                let is_unique = table_def
152                    .constraints
153                    .iter()
154                    .filter_map(|(_, c)| c.data.unique_columns())
155                    .any(|unique_cols| unique_cols == &index_cols);
156
157                log!(logger, "Creating index `{}` on table `{}`", index_name, table_def.name);
158
159                let index_schema = IndexSchema::from_module_def(plan.new, index_def, table_id, 0.into());
160
161                stdb.create_index(tx, index_schema, is_unique)?;
162            }
163            spacetimedb_schema::auto_migrate::AutoMigrateStep::RemoveIndex(index_name) => {
164                let table_def = plan.old.stored_in_table_def(index_name).unwrap();
165
166                let table_schema = &table_schemas_by_name[&table_def.name[..]];
167                let index_schema = table_schema
168                    .indexes
169                    .iter()
170                    .find(|index| index.index_name[..] == index_name[..])
171                    .unwrap();
172
173                log!(logger, "Dropping index `{}` on table `{}`", index_name, table_def.name);
174                stdb.drop_index(tx, index_schema.index_id)?;
175            }
176            spacetimedb_schema::auto_migrate::AutoMigrateStep::RemoveConstraint(constraint_name) => {
177                let table_def = plan.old.stored_in_table_def(constraint_name).unwrap();
178                let table_schema = &table_schemas_by_name[&table_def.name[..]];
179                let constraint_schema = table_schema
180                    .constraints
181                    .iter()
182                    .find(|constraint| constraint.constraint_name[..] == constraint_name[..])
183                    .unwrap();
184
185                log!(
186                    logger,
187                    "Dropping constraint `{}` on table `{}`",
188                    constraint_name,
189                    table_def.name
190                );
191                stdb.drop_constraint(tx, constraint_schema.constraint_id)?;
192            }
193            spacetimedb_schema::auto_migrate::AutoMigrateStep::AddSequence(sequence_name) => {
194                let table_def = plan.new.stored_in_table_def(sequence_name).unwrap();
195                let sequence_def = table_def.sequences.get(sequence_name).unwrap();
196                let table_schema = &table_schemas_by_name[&table_def.name[..]];
197
198                log!(
199                    logger,
200                    "Adding sequence `{}` to table `{}`",
201                    sequence_name,
202                    table_def.name
203                );
204                let sequence_schema =
205                    SequenceSchema::from_module_def(plan.new, sequence_def, table_schema.table_id, 0.into());
206                stdb.create_sequence(tx, sequence_schema)?;
207            }
208            spacetimedb_schema::auto_migrate::AutoMigrateStep::RemoveSequence(sequence_name) => {
209                let table_def = plan.old.stored_in_table_def(sequence_name).unwrap();
210                let table_schema = &table_schemas_by_name[&table_def.name[..]];
211                let sequence_schema = table_schema
212                    .sequences
213                    .iter()
214                    .find(|sequence| sequence.sequence_name[..] == sequence_name[..])
215                    .unwrap();
216
217                log!(
218                    logger,
219                    "Dropping sequence `{}` from table `{}`",
220                    sequence_name,
221                    table_def.name
222                );
223                stdb.drop_sequence(tx, sequence_schema.sequence_id)?;
224            }
225            spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangeColumns(table_name) => {
226                let table_def = plan.new.stored_in_table_def(table_name).unwrap();
227                let table_id = stdb.table_id_from_name_mut(tx, table_name).unwrap().unwrap();
228                let column_schemas = column_schemas_from_defs(plan.new, &table_def.columns, table_id);
229
230                log!(logger, "Changing columns of table `{}`", table_name);
231
232                stdb.alter_table_row_type(tx, table_id, column_schemas)?;
233            }
234            spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangeAccess(table_name) => {
235                let table_def = plan.new.stored_in_table_def(table_name).unwrap();
236                stdb.alter_table_access(tx, table_name, table_def.table_access.into())?;
237            }
238            spacetimedb_schema::auto_migrate::AutoMigrateStep::AddSchedule(_) => {
239                anyhow::bail!("Adding schedules is not yet implemented");
240            }
241            spacetimedb_schema::auto_migrate::AutoMigrateStep::RemoveSchedule(_) => {
242                anyhow::bail!("Removing schedules is not yet implemented");
243            }
244            spacetimedb_schema::auto_migrate::AutoMigrateStep::AddRowLevelSecurity(sql_rls) => {
245                log!(logger, "Adding row-level security `{sql_rls}`");
246                let rls = plan.new.lookup_expect(sql_rls);
247                let rls = RowLevelExpr::build_row_level_expr(tx, &auth_ctx, rls)?;
248
249                stdb.create_row_level_security(tx, rls.def)?;
250            }
251            spacetimedb_schema::auto_migrate::AutoMigrateStep::RemoveRowLevelSecurity(sql_rls) => {
252                log!(logger, "Removing-row level security `{sql_rls}`");
253                stdb.drop_row_level_security(tx, sql_rls.clone())?;
254            }
255        }
256    }
257
258    log::info!("Database update complete");
259    Ok(())
260}
261
262#[cfg(test)]
263mod test {
264    use super::*;
265    use crate::{
266        db::relational_db::tests_utils::{begin_mut_tx, insert, TestDB},
267        host::module_host::create_table_from_def,
268    };
269    use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange;
270    use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, TableAccess};
271    use spacetimedb_sats::{product, AlgebraicType::U64};
272    use spacetimedb_schema::{auto_migrate::ponder_migrate, def::ModuleDef};
273
274    struct TestLogger;
275    impl UpdateLogger for TestLogger {
276        fn info(&self, _: &str) {}
277    }
278
279    #[test]
280    fn update_db_repro_2761() -> anyhow::Result<()> {
281        let auth_ctx = AuthCtx::for_testing();
282        let stdb = TestDB::durable()?;
283
284        // Define the old and new modules, the latter with the index on `b`.
285        let define_p = |builder: &mut RawModuleDefV9Builder| {
286            builder
287                .build_table_with_new_type("p", [("x", U64), ("y", U64)], true)
288                .with_unique_constraint(0)
289                .with_unique_constraint(1)
290                .with_index(btree(0), "idx_x")
291                .with_index(btree(1), "idx_y")
292                .with_access(TableAccess::Public)
293                .finish()
294        };
295        let define_t = |builder: &mut RawModuleDefV9Builder, with_index| {
296            let builder = builder
297                .build_table_with_new_type("t", [("a", U64), ("b", U64)], true)
298                .with_access(TableAccess::Public);
299
300            let builder = if with_index {
301                builder.with_index(btree(1), "idx_b")
302            } else {
303                builder
304            };
305
306            builder.finish()
307        };
308        let module_def = |with_index| -> ModuleDef {
309            let mut builder = RawModuleDefV9Builder::new();
310            define_p(&mut builder);
311            define_t(&mut builder, with_index);
312            builder
313                .finish()
314                .try_into()
315                .expect("builder should create a valid database definition")
316        };
317
318        let old = module_def(false);
319        let new = module_def(true);
320
321        // Create tables for `old`.
322        let mut tx = begin_mut_tx(&stdb);
323        for def in old.tables() {
324            create_table_from_def(&stdb, &mut tx, &old, def)?;
325        }
326
327        // Write two rows to `t`
328        // that would cause a unique constraint violation if `idx_b` was unique.
329        let t_id = stdb
330            .table_id_from_name_mut(&tx, "t")?
331            .expect("there should be a table with name `t`");
332        insert(&stdb, &mut tx, t_id, &product![0u64, 42u64])?;
333        insert(&stdb, &mut tx, t_id, &product![1u64, 42u64])?;
334        stdb.commit_tx(tx)?;
335
336        // Try to update the db.
337        let mut tx = begin_mut_tx(&stdb);
338        let plan = ponder_migrate(&old, &new)?;
339        update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;
340
341        // Expect the schema change.
342        let idx_b_id = stdb
343            .index_id_from_name(&tx, "t_b_idx_btree")?
344            .expect("there should be an index named `idx_b`");
345        assert_eq!(
346            tx.pending_schema_changes(),
347            [PendingSchemaChange::IndexAdded(t_id, idx_b_id, None)]
348        );
349
350        Ok(())
351    }
352}