spacetimedb_schema/
schema.rs

1//! Schema data structures.
2//! These are used at runtime by the vm to store the schema of the database.
3//! They are mirrored in the system tables -- see `spacetimedb_core::db::datastore::system_tables`.
4//! Types in this file are not public ABI or API and may be changed at any time; it's the system tables that cannot.
5
6// TODO(1.0): change all the `Box<str>`s in this file to `Identifier`.
7// This doesn't affect the ABI so can wait until 1.0.
8
9use crate::def::error::{DefType, SchemaError};
10use crate::relation::{combine_constraints, Column, DbTable, FieldName, Header};
11use core::mem;
12use itertools::Itertools;
13use spacetimedb_lib::db::auth::{StAccess, StTableType};
14use spacetimedb_lib::db::raw_def::v9::RawSql;
15use spacetimedb_lib::db::raw_def::{generate_cols_name, RawConstraintDefV8};
16use spacetimedb_primitives::*;
17use spacetimedb_sats::product_value::InvalidFieldError;
18use spacetimedb_sats::{AlgebraicType, ProductType, ProductTypeElement, WithTypespace};
19use std::collections::BTreeMap;
20use std::sync::Arc;
21
22use crate::def::{
23    ColumnDef, ConstraintData, ConstraintDef, IndexAlgorithm, IndexDef, ModuleDef, ModuleDefLookup, ScheduleDef,
24    SequenceDef, TableDef, UniqueConstraintData,
25};
26use crate::identifier::Identifier;
27
28/// Helper trait documenting allowing schema entities to be built from a validated `ModuleDef`.
29pub trait Schema: Sized {
30    /// The `Def` type corresponding to this schema type.
31    type Def: ModuleDefLookup;
32    /// The `Id` type corresponding to this schema type.
33    type Id;
34    /// The `Id` type corresponding to the parent of this schema type.
35    /// Set to `()` if there is no parent.
36    type ParentId;
37
38    /// Construct a schema entity from a validated `ModuleDef`.
39    /// Panics if `module_def` does not contain `def`.
40    ///
41    /// If this schema entity contains children (e.g. if it is a table schema), they should be constructed with
42    /// IDs set to `ChildId::SENTINEL`.
43    ///
44    /// If this schema entity contains `AlgebraicType`s, they should be fully resolved by this function (via
45    /// `WithTypespace::resolve_refs`). This means they will no longer contain references to any typespace (and be non-recursive).
46    /// This is necessary because the database does not currently attempt to handle typespaces / recursive types.
47    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self;
48
49    /// Check that a schema entity is compatible with a definition.
50    fn check_compatible(&self, module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error>;
51}
52
53/// A data structure representing the schema of a database table.
54///
55/// This struct holds information about the table, including its identifier,
56/// name, columns, indexes, constraints, sequences, type, and access rights.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TableSchema {
59    /// The unique identifier of the table within the database.
60    pub table_id: TableId,
61
62    /// The name of the table.
63    // TODO(perf): This should likely be an `Arc<str>`, not a `Box<str>`, as we `Clone` it somewhat frequently.
64    pub table_name: Box<str>,
65
66    /// The columns of the table.
67    /// The ordering of the columns is significant. Columns are frequently identified by `ColId`, that is, position in this list.
68    pub columns: Vec<ColumnSchema>,
69
70    /// The primary key of the table, if present. Must refer to a valid column.
71    ///
72    /// Currently, there must be a unique constraint and an index corresponding to the primary key.
73    /// Eventually, we may remove the requirement for an index.
74    ///
75    /// The database engine does not actually care about this, but client code generation does.
76    pub primary_key: Option<ColId>,
77
78    /// The indexes on the table.
79    pub indexes: Vec<IndexSchema>,
80
81    /// The constraints on the table.
82    pub constraints: Vec<ConstraintSchema>,
83
84    /// The sequences on the table.
85    pub sequences: Vec<SequenceSchema>,
86
87    /// Whether the table was created by a user or by the system.
88    pub table_type: StTableType,
89
90    /// The visibility of the table.
91    pub table_access: StAccess,
92
93    /// The schedule for the table, if present.
94    pub schedule: Option<ScheduleSchema>,
95
96    /// Cache for `row_type_for_table` in the data store.
97    pub row_type: ProductType,
98}
99
100/// Converts a list of columns to a table's row type.
101pub fn columns_to_row_type(columns: &[ColumnSchema]) -> ProductType {
102    ProductType::new(columns.iter().map(ProductTypeElement::from).collect())
103}
104
105impl TableSchema {
106    /// Create a table schema.
107    #[allow(clippy::too_many_arguments)]
108    pub fn new(
109        table_id: TableId,
110        table_name: Box<str>,
111        columns: Vec<ColumnSchema>,
112        indexes: Vec<IndexSchema>,
113        constraints: Vec<ConstraintSchema>,
114        sequences: Vec<SequenceSchema>,
115        table_type: StTableType,
116        table_access: StAccess,
117        schedule: Option<ScheduleSchema>,
118        primary_key: Option<ColId>,
119    ) -> Self {
120        Self {
121            row_type: columns_to_row_type(&columns),
122            table_id,
123            table_name,
124            columns,
125            indexes,
126            constraints,
127            sequences,
128            table_type,
129            table_access,
130            schedule,
131            primary_key,
132        }
133    }
134
135    /// Create a `TableSchema` corresponding to a product type.
136    /// For use in tests.
137    #[cfg(feature = "test")]
138    pub fn from_product_type(ty: ProductType) -> TableSchema {
139        let columns = ty
140            .elements
141            .iter()
142            .enumerate()
143            .map(|(col_pos, element)| ColumnSchema {
144                table_id: TableId::SENTINEL,
145                col_pos: ColId(col_pos as _),
146                col_name: element.name.clone().unwrap_or_else(|| format!("col{col_pos}").into()),
147                col_type: element.algebraic_type.clone(),
148            })
149            .collect();
150
151        TableSchema::new(
152            TableId::SENTINEL,
153            "TestTable".into(),
154            columns,
155            vec![],
156            vec![],
157            vec![],
158            StTableType::User,
159            StAccess::Public,
160            None,
161            None,
162        )
163    }
164
165    /// Update the table id of this schema.
166    /// For use by the core database engine after assigning a table id.
167    pub fn update_table_id(&mut self, id: TableId) {
168        self.table_id = id;
169        self.columns.iter_mut().for_each(|c| c.table_id = id);
170        self.indexes.iter_mut().for_each(|i| i.table_id = id);
171        self.constraints.iter_mut().for_each(|c| c.table_id = id);
172        self.sequences.iter_mut().for_each(|s| s.table_id = id);
173        if let Some(s) = self.schedule.as_mut() {
174            s.table_id = id;
175        }
176    }
177
178    /// Convert a table schema into a list of columns.
179    pub fn into_columns(self) -> Vec<ColumnSchema> {
180        self.columns
181    }
182
183    /// Get the columns of the table. Only immutable access to the columns is provided.
184    /// The ordering of the columns is significant. Columns are frequently identified by `ColId`, that is, position in this list.
185    pub fn columns(&self) -> &[ColumnSchema] {
186        &self.columns
187    }
188
189    /// Extracts all the [Self::indexes], [Self::sequences], and [Self::constraints].
190    pub fn take_adjacent_schemas(&mut self) -> (Vec<IndexSchema>, Vec<SequenceSchema>, Vec<ConstraintSchema>) {
191        (
192            mem::take(&mut self.indexes),
193            mem::take(&mut self.sequences),
194            mem::take(&mut self.constraints),
195        )
196    }
197
198    // Crud operation on adjacent schemas
199
200    /// Add OR replace the [SequenceSchema]
201    pub fn update_sequence(&mut self, of: SequenceSchema) {
202        if let Some(x) = self.sequences.iter_mut().find(|x| x.sequence_id == of.sequence_id) {
203            *x = of;
204        } else {
205            self.sequences.push(of);
206        }
207    }
208
209    /// Removes the given `sequence_id`
210    pub fn remove_sequence(&mut self, sequence_id: SequenceId) -> Option<SequenceSchema> {
211        find_remove(&mut self.sequences, |x| x.sequence_id == sequence_id)
212    }
213
214    /// Add OR replace the [IndexSchema]
215    pub fn update_index(&mut self, of: IndexSchema) {
216        if let Some(x) = self.indexes.iter_mut().find(|x| x.index_id == of.index_id) {
217            *x = of;
218        } else {
219            self.indexes.push(of);
220        }
221    }
222
223    /// Removes the given `index_id`
224    pub fn remove_index(&mut self, index_id: IndexId) -> Option<IndexSchema> {
225        find_remove(&mut self.indexes, |x| x.index_id == index_id)
226    }
227
228    /// Add OR replace the [ConstraintSchema]
229    pub fn update_constraint(&mut self, of: ConstraintSchema) {
230        if let Some(x) = self
231            .constraints
232            .iter_mut()
233            .find(|x| x.constraint_id == of.constraint_id)
234        {
235            *x = of;
236        } else {
237            self.constraints.push(of);
238        }
239    }
240
241    /// Removes the given `index_id`
242    pub fn remove_constraint(&mut self, constraint_id: ConstraintId) -> Option<ConstraintSchema> {
243        find_remove(&mut self.constraints, |x| x.constraint_id == constraint_id)
244    }
245
246    /// Concatenate the column names from the `columns`
247    ///
248    /// WARNING: If the `ColId` not exist, is skipped.
249    /// TODO(Tyler): This should return an error and not allow this to be constructed
250    /// if there is an invalid `ColId`
251    pub fn generate_cols_name(&self, columns: &ColList) -> String {
252        generate_cols_name(columns, |p| self.get_column(p.idx()).map(|c| &*c.col_name))
253    }
254
255    /// Check if the specified `field` exists in this [TableSchema].
256    ///
257    /// # Warning
258    ///
259    /// This function ignores the `table_id` when searching for a column.
260    pub fn get_column_by_field(&self, field: FieldName) -> Option<&ColumnSchema> {
261        self.get_column(field.col.idx())
262    }
263
264    /// Look up a list of columns by their positions in the table.
265    /// Invalid column positions are permitted.
266    pub fn get_columns<'a>(
267        &'a self,
268        columns: &'a ColList,
269    ) -> impl 'a + Iterator<Item = (ColId, Option<&'a ColumnSchema>)> {
270        columns.iter().map(|col| (col, self.columns.get(col.idx())))
271    }
272
273    /// Get a reference to a column by its position (`pos`) in the table.
274    pub fn get_column(&self, pos: usize) -> Option<&ColumnSchema> {
275        self.columns.get(pos)
276    }
277
278    /// Check if the `col_name` exist on this [TableSchema]
279    pub fn get_column_by_name(&self, col_name: &str) -> Option<&ColumnSchema> {
280        self.columns.iter().find(|x| &*x.col_name == col_name)
281    }
282
283    /// Check if the `col_name` exist on this [TableSchema]
284    ///
285    /// Warning: It ignores the `table_name`
286    pub fn get_column_id_by_name(&self, col_name: &str) -> Option<ColId> {
287        self.columns
288            .iter()
289            .position(|x| &*x.col_name == col_name)
290            .map(|x| x.into())
291    }
292
293    /// Retrieve the column ids for this index id
294    pub fn col_list_for_index_id(&self, index_id: IndexId) -> ColList {
295        self.indexes
296            .iter()
297            .find(|schema| schema.index_id == index_id)
298            .map(|schema| schema.index_algorithm.columns())
299            .map(|cols| ColList::from_iter(cols.iter()))
300            .unwrap_or_else(ColList::empty)
301    }
302
303    /// Is there a unique constraint for this set of columns?
304    pub fn is_unique(&self, cols: &ColList) -> bool {
305        self.constraints
306            .iter()
307            .filter_map(|cs| cs.data.unique_columns())
308            .any(|unique_cols| **unique_cols == *cols)
309    }
310
311    /// Project the fields from the supplied `indexes`.
312    pub fn project(&self, indexes: impl Iterator<Item = ColId>) -> Result<Vec<&ColumnSchema>, InvalidFieldError> {
313        indexes
314            .map(|index| self.get_column(index.0 as usize).ok_or_else(|| index.into()))
315            .collect()
316    }
317
318    /// Utility for project the fields from the supplied `indexes` that is a [ColList],
319    /// used for when the list of field indexes have at least one value.
320    pub fn project_not_empty(&self, indexes: ColList) -> Result<Vec<&ColumnSchema>, InvalidFieldError> {
321        self.project(indexes.iter())
322    }
323
324    /// IMPORTANT: Is required to have this cached to avoid a perf drop on datastore operations
325    pub fn get_row_type(&self) -> &ProductType {
326        &self.row_type
327    }
328
329    /// Utility to avoid cloning in `row_type_for_table`
330    pub fn into_row_type(self) -> ProductType {
331        self.row_type
332    }
333
334    /// Iterate over the constraints on sets of columns on this table.
335    fn backcompat_constraints_iter(&self) -> impl Iterator<Item = (ColList, Constraints)> + '_ {
336        self.constraints
337            .iter()
338            .map(|x| -> (ColList, Constraints) {
339                match &x.data {
340                    ConstraintData::Unique(unique) => (unique.columns.clone().into(), Constraints::unique()),
341                }
342            })
343            .chain(self.indexes.iter().map(|x| match &x.index_algorithm {
344                IndexAlgorithm::BTree(btree) => (btree.columns.clone(), Constraints::indexed()),
345                IndexAlgorithm::Direct(direct) => (direct.column.into(), Constraints::indexed()),
346            }))
347            .chain(
348                self.sequences
349                    .iter()
350                    .map(|x| (col_list![x.col_pos], Constraints::auto_inc())),
351            )
352            .chain(
353                self.primary_key
354                    .iter()
355                    .map(|x| (col_list![*x], Constraints::primary_key())),
356            )
357    }
358
359    /// Get backwards-compatible constraints for this table.
360    ///
361    /// This is closer to how `TableSchema` used to work.
362    pub fn backcompat_constraints(&self) -> BTreeMap<ColList, Constraints> {
363        combine_constraints(self.backcompat_constraints_iter())
364    }
365
366    /// Get backwards-compatible constraints for this table.
367    ///
368    /// Resolves the constraints per each column. If the column don't have one, auto-generate [Constraints::unset()].
369    /// This guarantee all columns can be queried for it constraints.
370    pub fn backcompat_column_constraints(&self) -> BTreeMap<ColList, Constraints> {
371        let mut result = self.backcompat_constraints();
372        for col in &self.columns {
373            result.entry(col_list![col.col_pos]).or_insert(Constraints::unset());
374        }
375        result
376    }
377
378    /// Get the column corresponding to the primary key, if any.
379    pub fn pk(&self) -> Option<&ColumnSchema> {
380        self.primary_key.and_then(|pk| self.get_column(pk.0 as usize))
381    }
382
383    /// Verify the definitions of this schema are valid:
384    /// - Check all names are not empty
385    /// - All columns exists
386    /// - Only 1 PK
387    /// - Only 1 sequence per column
388    /// - Only Btree Indexes
389    ///
390    /// Deprecated. This will eventually be replaced by the `schema` crate.
391    pub fn validated(self) -> Result<Self, Vec<SchemaError>> {
392        let mut errors = Vec::new();
393
394        if self.table_name.is_empty() {
395            errors.push(SchemaError::EmptyTableName {
396                table_id: self.table_id,
397            });
398        }
399
400        let columns_not_found = self
401            .sequences
402            .iter()
403            .map(|x| (DefType::Sequence, x.sequence_name.clone(), ColList::new(x.col_pos)))
404            .chain(self.indexes.iter().map(|x| {
405                let cols = match &x.index_algorithm {
406                    IndexAlgorithm::BTree(btree) => btree.columns.clone(),
407                    IndexAlgorithm::Direct(direct) => direct.column.into(),
408                };
409                (DefType::Index, x.index_name.clone(), cols)
410            }))
411            .chain(self.constraints.iter().map(|x| {
412                (
413                    DefType::Constraint,
414                    x.constraint_name.clone(),
415                    match &x.data {
416                        ConstraintData::Unique(unique) => unique.columns.clone().into(),
417                    },
418                )
419            }))
420            .filter_map(|(ty, name, cols)| {
421                let mut not_found_iter = self
422                    .get_columns(&cols)
423                    .filter(|(_, x)| x.is_none())
424                    .map(|(col, _)| col)
425                    .peekable();
426
427                if not_found_iter.peek().is_none() {
428                    None
429                } else {
430                    Some(SchemaError::ColumnsNotFound {
431                        name,
432                        table: self.table_name.clone(),
433                        columns: not_found_iter.collect(),
434                        ty,
435                    })
436                }
437            });
438
439        errors.extend(columns_not_found);
440
441        errors.extend(self.columns.iter().filter_map(|x| {
442            if x.col_name.is_empty() {
443                Some(SchemaError::EmptyName {
444                    table: self.table_name.clone(),
445                    ty: DefType::Column,
446                    id: x.col_pos.0 as _,
447                })
448            } else {
449                None
450            }
451        }));
452
453        errors.extend(self.indexes.iter().filter_map(|x| {
454            if x.index_name.is_empty() {
455                Some(SchemaError::EmptyName {
456                    table: self.table_name.clone(),
457                    ty: DefType::Index,
458                    id: x.index_id.0,
459                })
460            } else {
461                None
462            }
463        }));
464        errors.extend(self.constraints.iter().filter_map(|x| {
465            if x.constraint_name.is_empty() {
466                Some(SchemaError::EmptyName {
467                    table: self.table_name.clone(),
468                    ty: DefType::Constraint,
469                    id: x.constraint_id.0,
470                })
471            } else {
472                None
473            }
474        }));
475
476        errors.extend(self.sequences.iter().filter_map(|x| {
477            if x.sequence_name.is_empty() {
478                Some(SchemaError::EmptyName {
479                    table: self.table_name.clone(),
480                    ty: DefType::Sequence,
481                    id: x.sequence_id.0,
482                })
483            } else {
484                None
485            }
486        }));
487
488        // Verify we don't have more than 1 auto_inc for the same column
489        if let Some(err) = self
490            .sequences
491            .iter()
492            .group_by(|&seq| seq.col_pos)
493            .into_iter()
494            .find_map(|(key, group)| {
495                let count = group.count();
496                if count > 1 {
497                    Some(SchemaError::OneAutoInc {
498                        table: self.table_name.clone(),
499                        field: self.columns[key.idx()].col_name.clone(),
500                    })
501                } else {
502                    None
503                }
504            })
505        {
506            errors.push(err);
507        }
508
509        if errors.is_empty() {
510            Ok(self)
511        } else {
512            Err(errors)
513        }
514    }
515
516    /// The C# and Rust SDKs are inconsistent about whether v8 column defs store resolved or unresolved algebraic types.
517    /// This method works around this problem by copying the column types from the module def into the table schema.
518    /// It can be removed once v8 is removed, since v9 will reject modules with an inconsistency like this.
519    pub fn janky_fix_column_defs(&mut self, module_def: &ModuleDef) {
520        let table_name = Identifier::new(self.table_name.clone()).unwrap();
521        for col in &mut self.columns {
522            let def: &ColumnDef = module_def
523                .lookup((&table_name, &Identifier::new(col.col_name.clone()).unwrap()))
524                .unwrap();
525            col.col_type = def.ty.clone();
526        }
527        let table_def: &TableDef = module_def.expect_lookup(&table_name);
528        self.row_type = module_def.typespace()[table_def.product_type_ref]
529            .as_product()
530            .unwrap()
531            .clone();
532    }
533
534    /// Normalize a `TableSchema`.
535    /// The result is semantically equivalent, but may have reordered indexes, constraints, or sequences.
536    /// Columns will not be reordered.
537    pub fn normalize(&mut self) {
538        self.indexes.sort_by(|a, b| a.index_name.cmp(&b.index_name));
539        self.constraints
540            .sort_by(|a, b| a.constraint_name.cmp(&b.constraint_name));
541        self.sequences.sort_by(|a, b| a.sequence_name.cmp(&b.sequence_name));
542    }
543}
544
545/// Removes and returns the first element satisfying `predicate` in `vec`.
546fn find_remove<T>(vec: &mut Vec<T>, predicate: impl Fn(&T) -> bool) -> Option<T> {
547    let pos = vec.iter().position(predicate)?;
548    Some(vec.remove(pos))
549}
550
551/// Like `assert_eq!` for `anyhow`, but `$msg` is just a string, not a format string.
552macro_rules! ensure_eq {
553    ($a:expr, $b:expr, $msg:expr) => {
554        if $a != $b {
555            anyhow::bail!(
556                "{0}: expected {1} == {2}:\n   {1}: {3:?}\n   {2}: {4:?}",
557                $msg,
558                stringify!($a),
559                stringify!($b),
560                $a,
561                $b
562            );
563        }
564    };
565}
566
567/// Returns the list of [`ColumnSchema`]s for a certain list of [`ColumnDef`]s.
568pub fn column_schemas_from_defs(module_def: &ModuleDef, columns: &[ColumnDef], table_id: TableId) -> Vec<ColumnSchema> {
569    columns
570        .iter()
571        .enumerate()
572        .map(|(col_pos, def)| ColumnSchema::from_module_def(module_def, def, (), (table_id, col_pos.into())))
573        .collect()
574}
575
576impl Schema for TableSchema {
577    type Def = TableDef;
578    type Id = TableId;
579    type ParentId = ();
580
581    // N.B. This implementation gives all children ID 0 (the auto-inc sentinel value.)
582    fn from_module_def(
583        module_def: &ModuleDef,
584        def: &Self::Def,
585        _parent_id: Self::ParentId,
586        table_id: Self::Id,
587    ) -> Self {
588        module_def.expect_contains(def);
589
590        let TableDef {
591            name,
592            product_type_ref: _,
593            primary_key,
594            columns,
595            indexes,
596            constraints,
597            sequences,
598            schedule,
599            table_type,
600            table_access,
601        } = def;
602
603        let columns = column_schemas_from_defs(module_def, columns, table_id);
604
605        // note: these Ids are fixed up somewhere else, so we can just use 0 here...
606        // but it would be nice to pass the correct values into this method.
607        let indexes = indexes
608            .values()
609            .map(|def| IndexSchema::from_module_def(module_def, def, table_id, IndexId::SENTINEL))
610            .collect();
611
612        let sequences = sequences
613            .values()
614            .map(|def| SequenceSchema::from_module_def(module_def, def, table_id, SequenceId::SENTINEL))
615            .collect();
616
617        let constraints = constraints
618            .values()
619            .map(|def| ConstraintSchema::from_module_def(module_def, def, table_id, ConstraintId::SENTINEL))
620            .collect();
621
622        let schedule = schedule
623            .as_ref()
624            .map(|schedule| ScheduleSchema::from_module_def(module_def, schedule, table_id, ScheduleId::SENTINEL));
625
626        TableSchema::new(
627            table_id,
628            (*name).clone().into(),
629            columns,
630            indexes,
631            constraints,
632            sequences,
633            (*table_type).into(),
634            (*table_access).into(),
635            schedule,
636            *primary_key,
637        )
638    }
639
640    fn check_compatible(&self, module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
641        ensure_eq!(&self.table_name[..], &def.name[..], "Table name mismatch");
642        ensure_eq!(self.primary_key, def.primary_key, "Primary key mismatch");
643        let def_table_access: StAccess = (def.table_access).into();
644        ensure_eq!(self.table_access, def_table_access, "Table access mismatch");
645        let def_table_type: StTableType = (def.table_type).into();
646        ensure_eq!(self.table_type, def_table_type, "Table type mismatch");
647
648        for col in &self.columns {
649            let col_def = def
650                .columns
651                .get(col.col_pos.0 as usize)
652                .ok_or_else(|| anyhow::anyhow!("Column {} not found in definition", col.col_pos.0))?;
653            col.check_compatible(module_def, col_def)?;
654        }
655        ensure_eq!(self.columns.len(), def.columns.len(), "Column count mismatch");
656
657        for index in &self.indexes {
658            let index_def = def
659                .indexes
660                .get(&index.index_name[..])
661                .ok_or_else(|| anyhow::anyhow!("Index {} not found in definition", index.index_id.0))?;
662            index.check_compatible(module_def, index_def)?;
663        }
664        ensure_eq!(self.indexes.len(), def.indexes.len(), "Index count mismatch");
665
666        for constraint in &self.constraints {
667            let constraint_def = def
668                .constraints
669                .get(&constraint.constraint_name[..])
670                .ok_or_else(|| anyhow::anyhow!("Constraint {} not found in definition", constraint.constraint_id.0))?;
671            constraint.check_compatible(module_def, constraint_def)?;
672        }
673        ensure_eq!(
674            self.constraints.len(),
675            def.constraints.len(),
676            "Constraint count mismatch"
677        );
678
679        for sequence in &self.sequences {
680            let sequence_def = def
681                .sequences
682                .get(&sequence.sequence_name[..])
683                .ok_or_else(|| anyhow::anyhow!("Sequence {} not found in definition", sequence.sequence_id.0))?;
684            sequence.check_compatible(module_def, sequence_def)?;
685        }
686        ensure_eq!(self.sequences.len(), def.sequences.len(), "Sequence count mismatch");
687
688        if let Some(schedule) = &self.schedule {
689            let schedule_def = def
690                .schedule
691                .as_ref()
692                .ok_or_else(|| anyhow::anyhow!("Schedule not found in definition"))?;
693            schedule.check_compatible(module_def, schedule_def)?;
694        }
695        ensure_eq!(
696            self.schedule.is_some(),
697            def.schedule.is_some(),
698            "Schedule presence mismatch"
699        );
700        Ok(())
701    }
702}
703
704impl From<&TableSchema> for ProductType {
705    fn from(value: &TableSchema) -> Self {
706        value.row_type.clone()
707    }
708}
709
710impl From<&TableSchema> for DbTable {
711    fn from(value: &TableSchema) -> Self {
712        DbTable::new(
713            Arc::new(value.into()),
714            value.table_id,
715            value.table_type,
716            value.table_access,
717        )
718    }
719}
720
721impl From<&TableSchema> for Header {
722    fn from(value: &TableSchema) -> Self {
723        let fields = value
724            .columns
725            .iter()
726            .map(|x| Column::new(FieldName::new(value.table_id, x.col_pos), x.col_type.clone()))
727            .collect();
728
729        Header::new(
730            value.table_id,
731            value.table_name.clone(),
732            fields,
733            value.backcompat_constraints(),
734        )
735    }
736}
737
738impl From<TableSchema> for Header {
739    fn from(schema: TableSchema) -> Self {
740        // TODO: optimize.
741        Header::from(&schema)
742    }
743}
744
745/// A struct representing the schema of a database column.
746#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
747pub struct ColumnSchema {
748    /// The ID of the table this column is attached to.
749    pub table_id: TableId,
750    /// The position of the column within the table.
751    pub col_pos: ColId,
752    /// The name of the column. Unique within the table.
753    pub col_name: Box<str>,
754    /// The type of the column. This will never contain any `AlgebraicTypeRef`s,
755    /// that is, it will be resolved.
756    pub col_type: AlgebraicType,
757}
758
759impl ColumnSchema {
760    pub fn for_test(pos: impl Into<ColId>, name: impl Into<Box<str>>, ty: AlgebraicType) -> Self {
761        Self {
762            table_id: TableId::SENTINEL,
763            col_pos: pos.into(),
764            col_name: name.into(),
765            col_type: ty,
766        }
767    }
768}
769
770impl Schema for ColumnSchema {
771    type Def = ColumnDef;
772    type ParentId = ();
773    // This is not like the other ID types: it's a tuple of the table ID and the column position.
774    // A `ColId` alone does NOT suffice to identify a column!
775    type Id = (TableId, ColId);
776
777    fn from_module_def(
778        module_def: &ModuleDef,
779        def: &ColumnDef,
780        _parent_id: (),
781        (table_id, col_pos): (TableId, ColId),
782    ) -> Self {
783        let col_type = WithTypespace::new(module_def.typespace(), &def.ty)
784            .resolve_refs()
785            .expect("validated module should have all types resolve");
786        ColumnSchema {
787            table_id,
788            col_pos,
789            col_name: (*def.name).into(),
790            col_type,
791        }
792    }
793
794    fn check_compatible(&self, module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
795        ensure_eq!(&self.col_name[..], &def.name[..], "Column name mismatch");
796        let resolved_def_ty = WithTypespace::new(module_def.typespace(), &def.ty).resolve_refs()?;
797        ensure_eq!(self.col_type, resolved_def_ty, "Column type mismatch");
798        ensure_eq!(self.col_pos, def.col_id, "Column ID mismatch");
799        Ok(())
800    }
801}
802
803impl From<&ColumnSchema> for ProductTypeElement {
804    fn from(value: &ColumnSchema) -> Self {
805        Self {
806            name: Some(value.col_name.clone()),
807            algebraic_type: value.col_type.clone(),
808        }
809    }
810}
811
812impl From<ColumnSchema> for Column {
813    fn from(schema: ColumnSchema) -> Self {
814        Column {
815            field: FieldName {
816                table: schema.table_id,
817                col: schema.col_pos,
818            },
819            algebraic_type: schema.col_type,
820        }
821    }
822}
823
824/// Contextualizes a reference to a [ColumnSchema] with the name of the table the column is attached to.
825#[derive(Debug, Clone)]
826pub struct ColumnSchemaRef<'a> {
827    /// The column we are referring to.
828    pub column: &'a ColumnSchema,
829    /// The name of the table the column is attached to.
830    pub table_name: &'a str,
831}
832
833impl From<ColumnSchemaRef<'_>> for ProductTypeElement {
834    fn from(value: ColumnSchemaRef) -> Self {
835        ProductTypeElement::new(value.column.col_type.clone(), Some(value.column.col_name.clone()))
836    }
837}
838
839/// Represents a schema definition for a database sequence.
840#[derive(Debug, Clone, PartialEq, Eq)]
841pub struct SequenceSchema {
842    /// The unique identifier for the sequence within a database.
843    pub sequence_id: SequenceId,
844    /// The name of the sequence.
845    /// Deprecated. In the future, sequences will be identified by col_pos.
846    pub sequence_name: Box<str>,
847    /// The ID of the table associated with the sequence.
848    pub table_id: TableId,
849    /// The position of the column associated with this sequence.
850    pub col_pos: ColId,
851    /// The increment value for the sequence.
852    pub increment: i128,
853    /// The starting value for the sequence.
854    pub start: i128,
855    /// The minimum value for the sequence.
856    pub min_value: i128,
857    /// The maximum value for the sequence.
858    pub max_value: i128,
859    /// How many values have already been allocated for the sequence.
860    pub allocated: i128,
861}
862
863impl Schema for SequenceSchema {
864    type Def = SequenceDef;
865    type Id = SequenceId;
866    type ParentId = TableId;
867
868    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
869        module_def.expect_contains(def);
870
871        SequenceSchema {
872            sequence_id: id,
873            sequence_name: (*def.name).into(),
874            table_id: parent_id,
875            col_pos: def.column,
876            increment: def.increment,
877            start: def.start.unwrap_or(1),
878            min_value: def.min_value.unwrap_or(1),
879            max_value: def.max_value.unwrap_or(i128::MAX),
880            allocated: 0, // TODO: information not available in the `Def`s anymore, which is correct, but this may need to be overridden later.
881        }
882    }
883
884    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
885        ensure_eq!(&self.sequence_name[..], &def.name[..], "Sequence name mismatch");
886        ensure_eq!(self.col_pos, def.column, "Sequence column mismatch");
887        ensure_eq!(self.increment, def.increment, "Sequence increment mismatch");
888        if let Some(start) = &def.start {
889            ensure_eq!(self.start, *start, "Sequence start mismatch");
890        }
891        if let Some(min_value) = &def.min_value {
892            ensure_eq!(self.min_value, *min_value, "Sequence min_value mismatch");
893        }
894        if let Some(max_value) = &def.max_value {
895            ensure_eq!(self.max_value, *max_value, "Sequence max_value mismatch");
896        }
897        Ok(())
898    }
899}
900
901/// Marks a table as a timer table for a scheduled reducer.
902#[derive(Debug, Clone, PartialEq, Eq)]
903pub struct ScheduleSchema {
904    /// The identifier of the table.
905    pub table_id: TableId,
906
907    /// The identifier of the schedule.
908    pub schedule_id: ScheduleId,
909
910    /// The name of the schedule.
911    pub schedule_name: Box<str>,
912
913    /// The name of the reducer to call.
914    pub reducer_name: Box<str>,
915
916    /// The column containing the `ScheduleAt` enum.
917    pub at_column: ColId,
918}
919
920impl ScheduleSchema {
921    pub fn for_test(name: impl Into<Box<str>>, reducer: impl Into<Box<str>>, at: impl Into<ColId>) -> Self {
922        Self {
923            table_id: TableId::SENTINEL,
924            schedule_id: ScheduleId::SENTINEL,
925            schedule_name: name.into(),
926            reducer_name: reducer.into(),
927            at_column: at.into(),
928        }
929    }
930}
931
932impl Schema for ScheduleSchema {
933    type Def = ScheduleDef;
934
935    type Id = ScheduleId;
936
937    type ParentId = TableId;
938
939    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
940        module_def.expect_contains(def);
941
942        ScheduleSchema {
943            table_id: parent_id,
944            schedule_id: id,
945            schedule_name: (*def.name).into(),
946            reducer_name: (*def.reducer_name).into(),
947            at_column: def.at_column,
948            // Ignore def.at_column and id_column. Those are recovered at runtime.
949        }
950    }
951
952    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
953        ensure_eq!(&self.schedule_name[..], &def.name[..], "Schedule name mismatch");
954        ensure_eq!(
955            &self.reducer_name[..],
956            &def.reducer_name[..],
957            "Schedule reducer name mismatch"
958        );
959        Ok(())
960    }
961}
962
963/// A struct representing the schema of a database index.
964#[derive(Debug, Clone, PartialEq, Eq)]
965pub struct IndexSchema {
966    /// The unique ID of the index within the schema.
967    pub index_id: IndexId,
968    /// The ID of the table associated with the index.
969    pub table_id: TableId,
970    /// The name of the index. This should not be assumed to follow any particular format.
971    /// Unique within the database.
972    pub index_name: Box<str>,
973    /// The data for the schema.
974    pub index_algorithm: IndexAlgorithm,
975}
976
977impl IndexSchema {
978    pub fn for_test(name: impl Into<Box<str>>, algo: impl Into<IndexAlgorithm>) -> Self {
979        Self {
980            index_id: IndexId::SENTINEL,
981            table_id: TableId::SENTINEL,
982            index_name: name.into(),
983            index_algorithm: algo.into(),
984        }
985    }
986}
987
988impl Schema for IndexSchema {
989    type Def = IndexDef;
990    type Id = IndexId;
991    type ParentId = TableId;
992
993    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
994        module_def.expect_contains(def);
995
996        let index_algorithm = def.algorithm.clone();
997        IndexSchema {
998            index_id: id,
999            table_id: parent_id,
1000            index_name: (*def.name).into(),
1001            index_algorithm,
1002        }
1003    }
1004
1005    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
1006        ensure_eq!(&self.index_name[..], &def.name[..], "Index name mismatch");
1007        ensure_eq!(&self.index_algorithm, &def.algorithm, "Index algorithm mismatch");
1008        Ok(())
1009    }
1010}
1011
1012/// A struct representing the schema of a database constraint.
1013///
1014/// This struct holds information about a database constraint, including its unique identifier,
1015/// name, the table it belongs to, and the columns it is associated with.
1016#[derive(Debug, Clone, PartialEq, Eq)]
1017pub struct ConstraintSchema {
1018    /// The ID of the table the constraint applies to.
1019    pub table_id: TableId,
1020    /// The unique ID of the constraint within the database.
1021    pub constraint_id: ConstraintId,
1022    /// The name of the constraint.
1023    pub constraint_name: Box<str>,
1024    /// The data for the constraint.
1025    pub data: ConstraintData, // this reuses the type from Def, which is fine, neither of `schema` nor `def` are ABI modules.
1026}
1027
1028impl ConstraintSchema {
1029    pub fn unique_for_test(name: impl Into<Box<str>>, cols: impl Into<ColSet>) -> Self {
1030        Self {
1031            table_id: TableId::SENTINEL,
1032            constraint_id: ConstraintId::SENTINEL,
1033            constraint_name: name.into(),
1034            data: ConstraintData::Unique(UniqueConstraintData { columns: cols.into() }),
1035        }
1036    }
1037
1038    /// Constructs a `ConstraintSchema` from a given `ConstraintDef` and table identifier.
1039    ///
1040    /// # Parameters
1041    ///
1042    /// * `table_id`: Identifier of the table to which the constraint belongs.
1043    /// * `constraint`: The `ConstraintDef` containing constraint information.
1044    #[deprecated(note = "Use TableSchema::from_module_def instead")]
1045    pub fn from_def(table_id: TableId, constraint: RawConstraintDefV8) -> Option<Self> {
1046        if constraint.constraints.has_unique() {
1047            Some(ConstraintSchema {
1048                constraint_id: ConstraintId::SENTINEL, // Set to 0 as it may be assigned later.
1049                constraint_name: constraint.constraint_name.trim().into(),
1050                table_id,
1051                data: ConstraintData::Unique(UniqueConstraintData {
1052                    columns: constraint.columns.into(),
1053                }),
1054            })
1055        } else {
1056            None
1057        }
1058    }
1059}
1060
1061impl Schema for ConstraintSchema {
1062    type Def = ConstraintDef;
1063    type Id = ConstraintId;
1064    type ParentId = TableId;
1065
1066    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
1067        module_def.expect_contains(def);
1068
1069        ConstraintSchema {
1070            constraint_id: id,
1071            constraint_name: (*def.name).into(),
1072            table_id: parent_id,
1073            data: def.data.clone(),
1074        }
1075    }
1076
1077    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
1078        ensure_eq!(&self.constraint_name[..], &def.name[..], "Constraint name mismatch");
1079        ensure_eq!(&self.data, &def.data, "Constraint data mismatch");
1080        Ok(())
1081    }
1082}
1083
1084/// A struct representing the schema of a row-level security policy.
1085#[derive(Debug, Clone, PartialEq, Eq)]
1086pub struct RowLevelSecuritySchema {
1087    pub table_id: TableId,
1088    pub sql: RawSql,
1089}