spacetimedb_schema/
schema.rs

1//! Schema data structures.
2//! These are used at runtime by the vm to store the schema of the database.
3//! They are mirrored in the system tables -- see `spacetimedb_core::db::datastore::system_tables`.
4//! Types in this file are not public ABI or API and may be changed at any time; it's the system tables that cannot.
5
6// TODO(1.0): change all the `Box<str>`s in this file to `Identifier`.
7// This doesn't affect the ABI so can wait until 1.0.
8
9use core::mem;
10use itertools::Itertools;
11use spacetimedb_lib::db::auth::{StAccess, StTableType};
12use spacetimedb_lib::db::error::{DefType, SchemaError};
13use spacetimedb_lib::db::raw_def::v9::RawSql;
14use spacetimedb_lib::db::raw_def::{generate_cols_name, RawConstraintDefV8};
15use spacetimedb_lib::relation::{combine_constraints, Column, DbTable, FieldName, Header};
16use spacetimedb_lib::{AlgebraicType, ProductType, ProductTypeElement};
17use spacetimedb_primitives::*;
18use spacetimedb_sats::product_value::InvalidFieldError;
19use spacetimedb_sats::WithTypespace;
20use std::collections::BTreeMap;
21use std::sync::Arc;
22
23use crate::def::{
24    ColumnDef, ConstraintData, ConstraintDef, IndexAlgorithm, IndexDef, ModuleDef, ModuleDefLookup, ScheduleDef,
25    SequenceDef, TableDef, UniqueConstraintData,
26};
27use crate::identifier::Identifier;
28
29/// Helper trait documenting allowing schema entities to be built from a validated `ModuleDef`.
30pub trait Schema: Sized {
31    /// The `Def` type corresponding to this schema type.
32    type Def: ModuleDefLookup;
33    /// The `Id` type corresponding to this schema type.
34    type Id;
35    /// The `Id` type corresponding to the parent of this schema type.
36    /// Set to `()` if there is no parent.
37    type ParentId;
38
39    /// Construct a schema entity from a validated `ModuleDef`.
40    /// Panics if `module_def` does not contain `def`.
41    ///
42    /// If this schema entity contains children (e.g. if it is a table schema), they should be constructed with
43    /// IDs set to `ChildId::SENTINEL`.
44    ///
45    /// If this schema entity contains `AlgebraicType`s, they should be fully resolved by this function (via
46    /// `WithTypespace::resolve_refs`). This means they will no longer contain references to any typespace (and be non-recursive).
47    /// This is necessary because the database does not currently attempt to handle typespaces / recursive types.
48    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self;
49
50    /// Check that a schema entity is compatible with a definition.
51    fn check_compatible(&self, module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error>;
52}
53
54/// A data structure representing the schema of a database table.
55///
56/// This struct holds information about the table, including its identifier,
57/// name, columns, indexes, constraints, sequences, type, and access rights.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct TableSchema {
60    /// The unique identifier of the table within the database.
61    pub table_id: TableId,
62
63    /// The name of the table.
64    // TODO(perf): This should likely be an `Arc<str>`, not a `Box<str>`, as we `Clone` it somewhat frequently.
65    pub table_name: Box<str>,
66
67    /// The columns of the table.
68    /// Inaccessible to prevent mutation.
69    /// The ordering of the columns is significant. Columns are frequently identified by `ColId`, that is, position in this list.
70    columns: Vec<ColumnSchema>,
71
72    /// The primary key of the table, if present. Must refer to a valid column.
73    ///
74    /// Currently, there must be a unique constraint and an index corresponding to the primary key.
75    /// Eventually, we may remove the requirement for an index.
76    ///
77    /// The database engine does not actually care about this, but client code generation does.
78    pub primary_key: Option<ColId>,
79
80    /// The indexes on the table.
81    pub indexes: Vec<IndexSchema>,
82
83    /// The constraints on the table.
84    pub constraints: Vec<ConstraintSchema>,
85
86    /// The sequences on the table.
87    pub sequences: Vec<SequenceSchema>,
88
89    /// Whether the table was created by a user or by the system.
90    pub table_type: StTableType,
91
92    /// The visibility of the table.
93    pub table_access: StAccess,
94
95    /// The schedule for the table, if present.
96    pub schedule: Option<ScheduleSchema>,
97
98    /// Cache for `row_type_for_table` in the data store.
99    row_type: ProductType,
100}
101
102impl TableSchema {
103    /// Create a table schema.
104    #[allow(clippy::too_many_arguments)]
105    pub fn new(
106        table_id: TableId,
107        table_name: Box<str>,
108        columns: Vec<ColumnSchema>,
109        indexes: Vec<IndexSchema>,
110        constraints: Vec<ConstraintSchema>,
111        sequences: Vec<SequenceSchema>,
112        table_type: StTableType,
113        table_access: StAccess,
114        schedule: Option<ScheduleSchema>,
115        primary_key: Option<ColId>,
116    ) -> Self {
117        let row_type = ProductType::new(
118            columns
119                .iter()
120                .map(|c| ProductTypeElement {
121                    name: Some(c.col_name.clone()),
122                    algebraic_type: c.col_type.clone(),
123                })
124                .collect(),
125        );
126
127        Self {
128            table_id,
129            table_name,
130            columns,
131            indexes,
132            constraints,
133            sequences,
134            table_type,
135            table_access,
136            row_type,
137            schedule,
138            primary_key,
139        }
140    }
141
142    /// Create a `TableSchema` corresponding to a product type.
143    /// For use in tests.
144    #[cfg(feature = "test")]
145    pub fn from_product_type(ty: ProductType) -> TableSchema {
146        let columns = ty
147            .elements
148            .iter()
149            .enumerate()
150            .map(|(col_pos, element)| ColumnSchema {
151                table_id: TableId::SENTINEL,
152                col_pos: ColId(col_pos as _),
153                col_name: element.name.clone().unwrap_or_else(|| format!("col{}", col_pos).into()),
154                col_type: element.algebraic_type.clone(),
155            })
156            .collect();
157
158        TableSchema::new(
159            TableId::SENTINEL,
160            "TestTable".into(),
161            columns,
162            vec![],
163            vec![],
164            vec![],
165            StTableType::User,
166            StAccess::Public,
167            None,
168            None,
169        )
170    }
171
172    /// Update the table id of this schema.
173    /// For use by the core database engine after assigning a table id.
174    pub fn update_table_id(&mut self, id: TableId) {
175        self.table_id = id;
176        self.columns.iter_mut().for_each(|c| c.table_id = id);
177        self.indexes.iter_mut().for_each(|i| i.table_id = id);
178        self.constraints.iter_mut().for_each(|c| c.table_id = id);
179        self.sequences.iter_mut().for_each(|s| s.table_id = id);
180        if let Some(s) = self.schedule.as_mut() {
181            s.table_id = id;
182        }
183    }
184
185    /// Convert a table schema into a list of columns.
186    pub fn into_columns(self) -> Vec<ColumnSchema> {
187        self.columns
188    }
189
190    /// Get the columns of the table. Only immutable access to the columns is provided.
191    /// The ordering of the columns is significant. Columns are frequently identified by `ColId`, that is, position in this list.
192    pub fn columns(&self) -> &[ColumnSchema] {
193        &self.columns
194    }
195
196    /// Extracts all the [Self::indexes], [Self::sequences], and [Self::constraints].
197    pub fn take_adjacent_schemas(&mut self) -> (Vec<IndexSchema>, Vec<SequenceSchema>, Vec<ConstraintSchema>) {
198        (
199            mem::take(&mut self.indexes),
200            mem::take(&mut self.sequences),
201            mem::take(&mut self.constraints),
202        )
203    }
204
205    // Crud operation on adjacent schemas
206
207    /// Add OR replace the [SequenceSchema]
208    pub fn update_sequence(&mut self, of: SequenceSchema) {
209        if let Some(x) = self.sequences.iter_mut().find(|x| x.sequence_id == of.sequence_id) {
210            *x = of;
211        } else {
212            self.sequences.push(of);
213        }
214    }
215
216    /// Removes the given `sequence_id`
217    pub fn remove_sequence(&mut self, sequence_id: SequenceId) -> Option<SequenceSchema> {
218        find_remove(&mut self.sequences, |x| x.sequence_id == sequence_id)
219    }
220
221    /// Add OR replace the [IndexSchema]
222    pub fn update_index(&mut self, of: IndexSchema) {
223        if let Some(x) = self.indexes.iter_mut().find(|x| x.index_id == of.index_id) {
224            *x = of;
225        } else {
226            self.indexes.push(of);
227        }
228    }
229
230    /// Removes the given `index_id`
231    pub fn remove_index(&mut self, index_id: IndexId) -> Option<IndexSchema> {
232        find_remove(&mut self.indexes, |x| x.index_id == index_id)
233    }
234
235    /// Add OR replace the [ConstraintSchema]
236    pub fn update_constraint(&mut self, of: ConstraintSchema) {
237        if let Some(x) = self
238            .constraints
239            .iter_mut()
240            .find(|x| x.constraint_id == of.constraint_id)
241        {
242            *x = of;
243        } else {
244            self.constraints.push(of);
245        }
246    }
247
248    /// Removes the given `index_id`
249    pub fn remove_constraint(&mut self, constraint_id: ConstraintId) -> Option<ConstraintSchema> {
250        find_remove(&mut self.constraints, |x| x.constraint_id == constraint_id)
251    }
252
253    /// Concatenate the column names from the `columns`
254    ///
255    /// WARNING: If the `ColId` not exist, is skipped.
256    /// TODO(Tyler): This should return an error and not allow this to be constructed
257    /// if there is an invalid `ColId`
258    pub fn generate_cols_name(&self, columns: &ColList) -> String {
259        generate_cols_name(columns, |p| self.get_column(p.idx()).map(|c| &*c.col_name))
260    }
261
262    /// Check if the specified `field` exists in this [TableSchema].
263    ///
264    /// # Warning
265    ///
266    /// This function ignores the `table_id` when searching for a column.
267    pub fn get_column_by_field(&self, field: FieldName) -> Option<&ColumnSchema> {
268        self.get_column(field.col.idx())
269    }
270
271    /// Look up a list of columns by their positions in the table.
272    /// Invalid column positions are permitted.
273    pub fn get_columns<'a>(
274        &'a self,
275        columns: &'a ColList,
276    ) -> impl 'a + Iterator<Item = (ColId, Option<&'a ColumnSchema>)> {
277        columns.iter().map(|col| (col, self.columns.get(col.idx())))
278    }
279
280    /// Get a reference to a column by its position (`pos`) in the table.
281    pub fn get_column(&self, pos: usize) -> Option<&ColumnSchema> {
282        self.columns.get(pos)
283    }
284
285    /// Check if the `col_name` exist on this [TableSchema]
286    pub fn get_column_by_name(&self, col_name: &str) -> Option<&ColumnSchema> {
287        self.columns.iter().find(|x| &*x.col_name == col_name)
288    }
289
290    /// Check if the `col_name` exist on this [TableSchema]
291    ///
292    /// Warning: It ignores the `table_name`
293    pub fn get_column_id_by_name(&self, col_name: &str) -> Option<ColId> {
294        self.columns
295            .iter()
296            .position(|x| &*x.col_name == col_name)
297            .map(|x| x.into())
298    }
299
300    /// Retrieve the column ids for this index id
301    pub fn col_list_for_index_id(&self, index_id: IndexId) -> ColList {
302        self.indexes
303            .iter()
304            .find(|schema| schema.index_id == index_id)
305            .map(|schema| schema.index_algorithm.columns())
306            .map(|cols| ColList::from_iter(cols.iter()))
307            .unwrap_or_else(ColList::empty)
308    }
309
310    /// Is there a unique constraint for this set of columns?
311    pub fn is_unique(&self, cols: &ColList) -> bool {
312        self.constraints
313            .iter()
314            .filter_map(|cs| cs.data.unique_columns())
315            .any(|unique_cols| **unique_cols == *cols)
316    }
317
318    /// Project the fields from the supplied `indexes`.
319    pub fn project(&self, indexes: impl Iterator<Item = ColId>) -> Result<Vec<&ColumnSchema>, InvalidFieldError> {
320        indexes
321            .map(|index| self.get_column(index.0 as usize).ok_or_else(|| index.into()))
322            .collect()
323    }
324
325    /// Utility for project the fields from the supplied `indexes` that is a [ColList],
326    /// used for when the list of field indexes have at least one value.
327    pub fn project_not_empty(&self, indexes: ColList) -> Result<Vec<&ColumnSchema>, InvalidFieldError> {
328        self.project(indexes.iter())
329    }
330
331    /// IMPORTANT: Is required to have this cached to avoid a perf drop on datastore operations
332    pub fn get_row_type(&self) -> &ProductType {
333        &self.row_type
334    }
335
336    /// Utility to avoid cloning in `row_type_for_table`
337    pub fn into_row_type(self) -> ProductType {
338        self.row_type
339    }
340
341    /// Iterate over the constraints on sets of columns on this table.
342    fn backcompat_constraints_iter(&self) -> impl Iterator<Item = (ColList, Constraints)> + '_ {
343        self.constraints
344            .iter()
345            .map(|x| -> (ColList, Constraints) {
346                match &x.data {
347                    ConstraintData::Unique(unique) => (unique.columns.clone().into(), Constraints::unique()),
348                }
349            })
350            .chain(self.indexes.iter().map(|x| match &x.index_algorithm {
351                IndexAlgorithm::BTree(btree) => (btree.columns.clone(), Constraints::indexed()),
352                IndexAlgorithm::Direct(direct) => (direct.column.into(), Constraints::indexed()),
353            }))
354            .chain(
355                self.sequences
356                    .iter()
357                    .map(|x| (col_list![x.col_pos], Constraints::auto_inc())),
358            )
359            .chain(
360                self.primary_key
361                    .iter()
362                    .map(|x| (col_list![*x], Constraints::primary_key())),
363            )
364    }
365
366    /// Get backwards-compatible constraints for this table.
367    ///
368    /// This is closer to how `TableSchema` used to work.
369    pub fn backcompat_constraints(&self) -> BTreeMap<ColList, Constraints> {
370        combine_constraints(self.backcompat_constraints_iter())
371    }
372
373    /// Get backwards-compatible constraints for this table.
374    ///
375    /// Resolves the constraints per each column. If the column don't have one, auto-generate [Constraints::unset()].
376    /// This guarantee all columns can be queried for it constraints.
377    pub fn backcompat_column_constraints(&self) -> BTreeMap<ColList, Constraints> {
378        let mut result = self.backcompat_constraints();
379        for col in &self.columns {
380            result.entry(col_list![col.col_pos]).or_insert(Constraints::unset());
381        }
382        result
383    }
384
385    /// Get the column corresponding to the primary key, if any.
386    pub fn pk(&self) -> Option<&ColumnSchema> {
387        self.primary_key.and_then(|pk| self.get_column(pk.0 as usize))
388    }
389
390    /// Verify the definitions of this schema are valid:
391    /// - Check all names are not empty
392    /// - All columns exists
393    /// - Only 1 PK
394    /// - Only 1 sequence per column
395    /// - Only Btree Indexes
396    ///
397    /// Deprecated. This will eventually be replaced by the `schema` crate.
398    pub fn validated(self) -> Result<Self, Vec<SchemaError>> {
399        let mut errors = Vec::new();
400
401        if self.table_name.is_empty() {
402            errors.push(SchemaError::EmptyTableName {
403                table_id: self.table_id,
404            });
405        }
406
407        let columns_not_found = self
408            .sequences
409            .iter()
410            .map(|x| (DefType::Sequence, x.sequence_name.clone(), ColList::new(x.col_pos)))
411            .chain(self.indexes.iter().map(|x| {
412                let cols = match &x.index_algorithm {
413                    IndexAlgorithm::BTree(btree) => btree.columns.clone(),
414                    IndexAlgorithm::Direct(direct) => direct.column.into(),
415                };
416                (DefType::Index, x.index_name.clone(), cols)
417            }))
418            .chain(self.constraints.iter().map(|x| {
419                (
420                    DefType::Constraint,
421                    x.constraint_name.clone(),
422                    match &x.data {
423                        ConstraintData::Unique(unique) => unique.columns.clone().into(),
424                    },
425                )
426            }))
427            .filter_map(|(ty, name, cols)| {
428                let mut not_found_iter = self
429                    .get_columns(&cols)
430                    .filter(|(_, x)| x.is_none())
431                    .map(|(col, _)| col)
432                    .peekable();
433
434                if not_found_iter.peek().is_none() {
435                    None
436                } else {
437                    Some(SchemaError::ColumnsNotFound {
438                        name,
439                        table: self.table_name.clone(),
440                        columns: not_found_iter.collect(),
441                        ty,
442                    })
443                }
444            });
445
446        errors.extend(columns_not_found);
447
448        errors.extend(self.columns.iter().filter_map(|x| {
449            if x.col_name.is_empty() {
450                Some(SchemaError::EmptyName {
451                    table: self.table_name.clone(),
452                    ty: DefType::Column,
453                    id: x.col_pos.0 as _,
454                })
455            } else {
456                None
457            }
458        }));
459
460        errors.extend(self.indexes.iter().filter_map(|x| {
461            if x.index_name.is_empty() {
462                Some(SchemaError::EmptyName {
463                    table: self.table_name.clone(),
464                    ty: DefType::Index,
465                    id: x.index_id.0,
466                })
467            } else {
468                None
469            }
470        }));
471        errors.extend(self.constraints.iter().filter_map(|x| {
472            if x.constraint_name.is_empty() {
473                Some(SchemaError::EmptyName {
474                    table: self.table_name.clone(),
475                    ty: DefType::Constraint,
476                    id: x.constraint_id.0,
477                })
478            } else {
479                None
480            }
481        }));
482
483        errors.extend(self.sequences.iter().filter_map(|x| {
484            if x.sequence_name.is_empty() {
485                Some(SchemaError::EmptyName {
486                    table: self.table_name.clone(),
487                    ty: DefType::Sequence,
488                    id: x.sequence_id.0,
489                })
490            } else {
491                None
492            }
493        }));
494
495        // Verify we don't have more than 1 auto_inc for the same column
496        if let Some(err) = self
497            .sequences
498            .iter()
499            .group_by(|&seq| seq.col_pos)
500            .into_iter()
501            .find_map(|(key, group)| {
502                let count = group.count();
503                if count > 1 {
504                    Some(SchemaError::OneAutoInc {
505                        table: self.table_name.clone(),
506                        field: self.columns[key.idx()].col_name.clone(),
507                    })
508                } else {
509                    None
510                }
511            })
512        {
513            errors.push(err);
514        }
515
516        if errors.is_empty() {
517            Ok(self)
518        } else {
519            Err(errors)
520        }
521    }
522
523    /// The C# and Rust SDKs are inconsistent about whether v8 column defs store resolved or unresolved algebraic types.
524    /// This method works around this problem by copying the column types from the module def into the table schema.
525    /// It can be removed once v8 is removed, since v9 will reject modules with an inconsistency like this.
526    pub fn janky_fix_column_defs(&mut self, module_def: &ModuleDef) {
527        let table_name = Identifier::new(self.table_name.clone()).unwrap();
528        for col in &mut self.columns {
529            let def: &ColumnDef = module_def
530                .lookup((&table_name, &Identifier::new(col.col_name.clone()).unwrap()))
531                .unwrap();
532            col.col_type = def.ty.clone();
533        }
534        let table_def: &TableDef = module_def.expect_lookup(&table_name);
535        self.row_type = module_def.typespace()[table_def.product_type_ref]
536            .as_product()
537            .unwrap()
538            .clone();
539    }
540
541    /// Normalize a `TableSchema`.
542    /// The result is semantically equivalent, but may have reordered indexes, constraints, or sequences.
543    /// Columns will not be reordered.
544    pub fn normalize(&mut self) {
545        self.indexes.sort_by(|a, b| a.index_name.cmp(&b.index_name));
546        self.constraints
547            .sort_by(|a, b| a.constraint_name.cmp(&b.constraint_name));
548        self.sequences.sort_by(|a, b| a.sequence_name.cmp(&b.sequence_name));
549    }
550}
551
552/// Removes and returns the first element satisfying `predicate` in `vec`.
553fn find_remove<T>(vec: &mut Vec<T>, predicate: impl Fn(&T) -> bool) -> Option<T> {
554    let pos = vec.iter().position(predicate)?;
555    Some(vec.remove(pos))
556}
557
558/// Like `assert_eq!` for `anyhow`, but `$msg` is just a string, not a format string.
559macro_rules! ensure_eq {
560    ($a:expr, $b:expr, $msg:expr) => {
561        if $a != $b {
562            anyhow::bail!(
563                "{0}: expected {1} == {2}:\n   {1}: {3:?}\n   {2}: {4:?}",
564                $msg,
565                stringify!($a),
566                stringify!($b),
567                $a,
568                $b
569            );
570        }
571    };
572}
573
574impl Schema for TableSchema {
575    type Def = TableDef;
576    type Id = TableId;
577    type ParentId = ();
578
579    // N.B. This implementation gives all children ID 0 (the auto-inc sentinel value.)
580    fn from_module_def(
581        module_def: &ModuleDef,
582        def: &Self::Def,
583        _parent_id: Self::ParentId,
584        table_id: Self::Id,
585    ) -> Self {
586        module_def.expect_contains(def);
587
588        let TableDef {
589            name,
590            product_type_ref: _,
591            primary_key,
592            columns,
593            indexes,
594            constraints,
595            sequences,
596            schedule,
597            table_type,
598            table_access,
599        } = def;
600
601        let columns: Vec<ColumnSchema> = columns
602            .iter()
603            .enumerate()
604            .map(|(col_pos, def)| ColumnSchema::from_module_def(module_def, def, (), (table_id, col_pos.into())))
605            .collect();
606
607        // note: these Ids are fixed up somewhere else, so we can just use 0 here...
608        // but it would be nice to pass the correct values into this method.
609        let indexes = indexes
610            .values()
611            .map(|def| IndexSchema::from_module_def(module_def, def, table_id, IndexId::SENTINEL))
612            .collect();
613
614        let sequences = sequences
615            .values()
616            .map(|def| SequenceSchema::from_module_def(module_def, def, table_id, SequenceId::SENTINEL))
617            .collect();
618
619        let constraints = constraints
620            .values()
621            .map(|def| ConstraintSchema::from_module_def(module_def, def, table_id, ConstraintId::SENTINEL))
622            .collect();
623
624        let schedule = schedule
625            .as_ref()
626            .map(|schedule| ScheduleSchema::from_module_def(module_def, schedule, table_id, ScheduleId::SENTINEL));
627
628        TableSchema::new(
629            table_id,
630            (*name).clone().into(),
631            columns,
632            indexes,
633            constraints,
634            sequences,
635            (*table_type).into(),
636            (*table_access).into(),
637            schedule,
638            *primary_key,
639        )
640    }
641
642    fn check_compatible(&self, module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
643        ensure_eq!(&self.table_name[..], &def.name[..], "Table name mismatch");
644        ensure_eq!(self.primary_key, def.primary_key, "Primary key mismatch");
645        let def_table_access: StAccess = (def.table_access).into();
646        ensure_eq!(self.table_access, def_table_access, "Table access mismatch");
647        let def_table_type: StTableType = (def.table_type).into();
648        ensure_eq!(self.table_type, def_table_type, "Table type mismatch");
649
650        for col in &self.columns {
651            let col_def = def
652                .columns
653                .get(col.col_pos.0 as usize)
654                .ok_or_else(|| anyhow::anyhow!("Column {} not found in definition", col.col_pos.0))?;
655            col.check_compatible(module_def, col_def)?;
656        }
657        ensure_eq!(self.columns.len(), def.columns.len(), "Column count mismatch");
658
659        for index in &self.indexes {
660            let index_def = def
661                .indexes
662                .get(&index.index_name[..])
663                .ok_or_else(|| anyhow::anyhow!("Index {} not found in definition", index.index_id.0))?;
664            index.check_compatible(module_def, index_def)?;
665        }
666        ensure_eq!(self.indexes.len(), def.indexes.len(), "Index count mismatch");
667
668        for constraint in &self.constraints {
669            let constraint_def = def
670                .constraints
671                .get(&constraint.constraint_name[..])
672                .ok_or_else(|| anyhow::anyhow!("Constraint {} not found in definition", constraint.constraint_id.0))?;
673            constraint.check_compatible(module_def, constraint_def)?;
674        }
675        ensure_eq!(
676            self.constraints.len(),
677            def.constraints.len(),
678            "Constraint count mismatch"
679        );
680
681        for sequence in &self.sequences {
682            let sequence_def = def
683                .sequences
684                .get(&sequence.sequence_name[..])
685                .ok_or_else(|| anyhow::anyhow!("Sequence {} not found in definition", sequence.sequence_id.0))?;
686            sequence.check_compatible(module_def, sequence_def)?;
687        }
688        ensure_eq!(self.sequences.len(), def.sequences.len(), "Sequence count mismatch");
689
690        if let Some(schedule) = &self.schedule {
691            let schedule_def = def
692                .schedule
693                .as_ref()
694                .ok_or_else(|| anyhow::anyhow!("Schedule not found in definition"))?;
695            schedule.check_compatible(module_def, schedule_def)?;
696        }
697        ensure_eq!(
698            self.schedule.is_some(),
699            def.schedule.is_some(),
700            "Schedule presence mismatch"
701        );
702        Ok(())
703    }
704}
705
706impl From<&TableSchema> for ProductType {
707    fn from(value: &TableSchema) -> Self {
708        ProductType::new(
709            value
710                .columns
711                .iter()
712                .map(|c| ProductTypeElement {
713                    name: Some(c.col_name.clone()),
714                    algebraic_type: c.col_type.clone(),
715                })
716                .collect(),
717        )
718    }
719}
720
721impl From<&TableSchema> for DbTable {
722    fn from(value: &TableSchema) -> Self {
723        DbTable::new(
724            Arc::new(value.into()),
725            value.table_id,
726            value.table_type,
727            value.table_access,
728        )
729    }
730}
731
732impl From<&TableSchema> for Header {
733    fn from(value: &TableSchema) -> Self {
734        let fields = value
735            .columns
736            .iter()
737            .map(|x| Column::new(FieldName::new(value.table_id, x.col_pos), x.col_type.clone()))
738            .collect();
739
740        Header::new(
741            value.table_id,
742            value.table_name.clone(),
743            fields,
744            value.backcompat_constraints(),
745        )
746    }
747}
748
749impl From<TableSchema> for Header {
750    fn from(schema: TableSchema) -> Self {
751        // TODO: optimize.
752        Header::from(&schema)
753    }
754}
755
756/// A struct representing the schema of a database column.
757#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
758pub struct ColumnSchema {
759    /// The ID of the table this column is attached to.
760    pub table_id: TableId,
761    /// The position of the column within the table.
762    pub col_pos: ColId,
763    /// The name of the column. Unique within the table.
764    pub col_name: Box<str>,
765    /// The type of the column. This will never contain any `AlgebraicTypeRef`s,
766    /// that is, it will be resolved.
767    pub col_type: AlgebraicType,
768}
769
770impl ColumnSchema {
771    pub fn for_test(pos: impl Into<ColId>, name: impl Into<Box<str>>, ty: AlgebraicType) -> Self {
772        Self {
773            table_id: TableId::SENTINEL,
774            col_pos: pos.into(),
775            col_name: name.into(),
776            col_type: ty,
777        }
778    }
779}
780
781impl Schema for ColumnSchema {
782    type Def = ColumnDef;
783    type ParentId = ();
784    // This is not like the other ID types: it's a tuple of the table ID and the column position.
785    // A `ColId` alone does NOT suffice to identify a column!
786    type Id = (TableId, ColId);
787
788    fn from_module_def(
789        module_def: &ModuleDef,
790        def: &ColumnDef,
791        _parent_id: (),
792        (table_id, col_pos): (TableId, ColId),
793    ) -> Self {
794        let col_type = WithTypespace::new(module_def.typespace(), &def.ty)
795            .resolve_refs()
796            .expect("validated module should have all types resolve");
797        ColumnSchema {
798            table_id,
799            col_pos,
800            col_name: (*def.name).into(),
801            col_type,
802        }
803    }
804
805    fn check_compatible(&self, module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
806        ensure_eq!(&self.col_name[..], &def.name[..], "Column name mismatch");
807        let resolved_def_ty = WithTypespace::new(module_def.typespace(), &def.ty).resolve_refs()?;
808        ensure_eq!(self.col_type, resolved_def_ty, "Column type mismatch");
809        ensure_eq!(self.col_pos, def.col_id, "Columnh ID mismatch");
810        Ok(())
811    }
812}
813
814impl From<&ColumnSchema> for ProductTypeElement {
815    fn from(value: &ColumnSchema) -> Self {
816        Self {
817            name: Some(value.col_name.clone()),
818            algebraic_type: value.col_type.clone(),
819        }
820    }
821}
822
823impl From<ColumnSchema> for Column {
824    fn from(schema: ColumnSchema) -> Self {
825        Column {
826            field: FieldName {
827                table: schema.table_id,
828                col: schema.col_pos,
829            },
830            algebraic_type: schema.col_type,
831        }
832    }
833}
834
835/// Contextualizes a reference to a [ColumnSchema] with the name of the table the column is attached to.
836#[derive(Debug, Clone)]
837pub struct ColumnSchemaRef<'a> {
838    /// The column we are referring to.
839    pub column: &'a ColumnSchema,
840    /// The name of the table the column is attached to.
841    pub table_name: &'a str,
842}
843
844impl From<ColumnSchemaRef<'_>> for ProductTypeElement {
845    fn from(value: ColumnSchemaRef) -> Self {
846        ProductTypeElement::new(value.column.col_type.clone(), Some(value.column.col_name.clone()))
847    }
848}
849
850/// Represents a schema definition for a database sequence.
851#[derive(Debug, Clone, PartialEq, Eq)]
852pub struct SequenceSchema {
853    /// The unique identifier for the sequence within a database.
854    pub sequence_id: SequenceId,
855    /// The name of the sequence.
856    /// Deprecated. In the future, sequences will be identified by col_pos.
857    pub sequence_name: Box<str>,
858    /// The ID of the table associated with the sequence.
859    pub table_id: TableId,
860    /// The position of the column associated with this sequence.
861    pub col_pos: ColId,
862    /// The increment value for the sequence.
863    pub increment: i128,
864    /// The starting value for the sequence.
865    pub start: i128,
866    /// The minimum value for the sequence.
867    pub min_value: i128,
868    /// The maximum value for the sequence.
869    pub max_value: i128,
870    /// How many values have already been allocated for the sequence.
871    pub allocated: i128,
872}
873
874impl Schema for SequenceSchema {
875    type Def = SequenceDef;
876    type Id = SequenceId;
877    type ParentId = TableId;
878
879    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
880        module_def.expect_contains(def);
881
882        SequenceSchema {
883            sequence_id: id,
884            sequence_name: (*def.name).into(),
885            table_id: parent_id,
886            col_pos: def.column,
887            increment: def.increment,
888            start: def.start.unwrap_or(1),
889            min_value: def.min_value.unwrap_or(1),
890            max_value: def.max_value.unwrap_or(i128::MAX),
891            allocated: 0, // TODO: information not available in the `Def`s anymore, which is correct, but this may need to be overridden later.
892        }
893    }
894
895    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
896        ensure_eq!(&self.sequence_name[..], &def.name[..], "Sequence name mismatch");
897        ensure_eq!(self.col_pos, def.column, "Sequence column mismatch");
898        ensure_eq!(self.increment, def.increment, "Sequence increment mismatch");
899        if let Some(start) = &def.start {
900            ensure_eq!(self.start, *start, "Sequence start mismatch");
901        }
902        if let Some(min_value) = &def.min_value {
903            ensure_eq!(self.min_value, *min_value, "Sequence min_value mismatch");
904        }
905        if let Some(max_value) = &def.max_value {
906            ensure_eq!(self.max_value, *max_value, "Sequence max_value mismatch");
907        }
908        Ok(())
909    }
910}
911
912/// Marks a table as a timer table for a scheduled reducer.
913#[derive(Debug, Clone, PartialEq, Eq)]
914pub struct ScheduleSchema {
915    /// The identifier of the table.
916    pub table_id: TableId,
917
918    /// The identifier of the schedule.
919    pub schedule_id: ScheduleId,
920
921    /// The name of the schedule.
922    pub schedule_name: Box<str>,
923
924    /// The name of the reducer to call.
925    pub reducer_name: Box<str>,
926
927    /// The column containing the `ScheduleAt` enum.
928    pub at_column: ColId,
929}
930
931impl ScheduleSchema {
932    pub fn for_test(name: impl Into<Box<str>>, reducer: impl Into<Box<str>>, at: impl Into<ColId>) -> Self {
933        Self {
934            table_id: TableId::SENTINEL,
935            schedule_id: ScheduleId::SENTINEL,
936            schedule_name: name.into(),
937            reducer_name: reducer.into(),
938            at_column: at.into(),
939        }
940    }
941}
942
943impl Schema for ScheduleSchema {
944    type Def = ScheduleDef;
945
946    type Id = ScheduleId;
947
948    type ParentId = TableId;
949
950    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
951        module_def.expect_contains(def);
952
953        ScheduleSchema {
954            table_id: parent_id,
955            schedule_id: id,
956            schedule_name: (*def.name).into(),
957            reducer_name: (*def.reducer_name).into(),
958            at_column: def.at_column,
959            // Ignore def.at_column and id_column. Those are recovered at runtime.
960        }
961    }
962
963    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
964        ensure_eq!(&self.schedule_name[..], &def.name[..], "Schedule name mismatch");
965        ensure_eq!(
966            &self.reducer_name[..],
967            &def.reducer_name[..],
968            "Schedule reducer name mismatch"
969        );
970        Ok(())
971    }
972}
973
974/// A struct representing the schema of a database index.
975#[derive(Debug, Clone, PartialEq, Eq)]
976pub struct IndexSchema {
977    /// The unique ID of the index within the schema.
978    pub index_id: IndexId,
979    /// The ID of the table associated with the index.
980    pub table_id: TableId,
981    /// The name of the index. This should not be assumed to follow any particular format.
982    /// Unique within the database.
983    pub index_name: Box<str>,
984    /// The data for the schema.
985    pub index_algorithm: IndexAlgorithm,
986}
987
988impl IndexSchema {
989    pub fn for_test(name: impl Into<Box<str>>, algo: impl Into<IndexAlgorithm>) -> Self {
990        Self {
991            index_id: IndexId::SENTINEL,
992            table_id: TableId::SENTINEL,
993            index_name: name.into(),
994            index_algorithm: algo.into(),
995        }
996    }
997}
998
999impl Schema for IndexSchema {
1000    type Def = IndexDef;
1001    type Id = IndexId;
1002    type ParentId = TableId;
1003
1004    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
1005        module_def.expect_contains(def);
1006
1007        let index_algorithm = def.algorithm.clone();
1008        IndexSchema {
1009            index_id: id,
1010            table_id: parent_id,
1011            index_name: (*def.name).into(),
1012            index_algorithm,
1013        }
1014    }
1015
1016    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
1017        ensure_eq!(&self.index_name[..], &def.name[..], "Index name mismatch");
1018        ensure_eq!(&self.index_algorithm, &def.algorithm, "Index algorithm mismatch");
1019        Ok(())
1020    }
1021}
1022
1023/// A struct representing the schema of a database constraint.
1024///
1025/// This struct holds information about a database constraint, including its unique identifier,
1026/// name, the table it belongs to, and the columns it is associated with.
1027#[derive(Debug, Clone, PartialEq, Eq)]
1028pub struct ConstraintSchema {
1029    /// The ID of the table the constraint applies to.
1030    pub table_id: TableId,
1031    /// The unique ID of the constraint within the database.
1032    pub constraint_id: ConstraintId,
1033    /// The name of the constraint.
1034    pub constraint_name: Box<str>,
1035    /// The data for the constraint.
1036    pub data: ConstraintData, // this reuses the type from Def, which is fine, neither of `schema` nor `def` are ABI modules.
1037}
1038
1039impl ConstraintSchema {
1040    pub fn unique_for_test(name: impl Into<Box<str>>, cols: impl Into<ColSet>) -> Self {
1041        Self {
1042            table_id: TableId::SENTINEL,
1043            constraint_id: ConstraintId::SENTINEL,
1044            constraint_name: name.into(),
1045            data: ConstraintData::Unique(UniqueConstraintData { columns: cols.into() }),
1046        }
1047    }
1048
1049    /// Constructs a `ConstraintSchema` from a given `ConstraintDef` and table identifier.
1050    ///
1051    /// # Parameters
1052    ///
1053    /// * `table_id`: Identifier of the table to which the constraint belongs.
1054    /// * `constraint`: The `ConstraintDef` containing constraint information.
1055    #[deprecated(note = "Use TableSchema::from_module_def instead")]
1056    pub fn from_def(table_id: TableId, constraint: RawConstraintDefV8) -> Option<Self> {
1057        if constraint.constraints.has_unique() {
1058            Some(ConstraintSchema {
1059                constraint_id: ConstraintId::SENTINEL, // Set to 0 as it may be assigned later.
1060                constraint_name: constraint.constraint_name.trim().into(),
1061                table_id,
1062                data: ConstraintData::Unique(UniqueConstraintData {
1063                    columns: constraint.columns.into(),
1064                }),
1065            })
1066        } else {
1067            None
1068        }
1069    }
1070}
1071
1072impl Schema for ConstraintSchema {
1073    type Def = ConstraintDef;
1074    type Id = ConstraintId;
1075    type ParentId = TableId;
1076
1077    fn from_module_def(module_def: &ModuleDef, def: &Self::Def, parent_id: Self::ParentId, id: Self::Id) -> Self {
1078        module_def.expect_contains(def);
1079
1080        ConstraintSchema {
1081            constraint_id: id,
1082            constraint_name: (*def.name).into(),
1083            table_id: parent_id,
1084            data: def.data.clone(),
1085        }
1086    }
1087
1088    fn check_compatible(&self, _module_def: &ModuleDef, def: &Self::Def) -> Result<(), anyhow::Error> {
1089        ensure_eq!(&self.constraint_name[..], &def.name[..], "Constraint name mismatch");
1090        ensure_eq!(&self.data, &def.data, "Constraint data mismatch");
1091        Ok(())
1092    }
1093}
1094
1095/// A struct representing the schema of a row-level security policy.
1096#[derive(Debug, Clone, PartialEq, Eq)]
1097pub struct RowLevelSecuritySchema {
1098    pub table_id: TableId,
1099    pub sql: RawSql,
1100}