Skip to main content

modelvault_core/
schema_compat.rs

1use crate::error::{DbError, FormatError, SchemaError};
2use crate::schema::{IndexDef, IndexKind, SchemaChange, Type};
3use crate::{schema, validation};
4
5/// Classify a schema update from `(old_fields, old_indexes)` to `(new_fields, new_indexes)`.
6///
7/// Policy (v0.9.0, conservative):
8/// - Existing fields must remain present and type-compatible.
9/// - Enum fields may add variants (superset) but may not remove variants.
10/// - Constraints must be identical for existing fields (tightening is treated as breaking).
11/// - New fields are `Safe` only if they are top-level-absent-compatible (`Optional`); otherwise `NeedsMigration`.
12/// - Existing indexes must remain unchanged; adding indexes is `Safe` for `NonUnique` and `NeedsMigration` for `Unique`.
13pub fn classify_schema_update(
14    old_fields: &[schema::FieldDef],
15    old_indexes: &[IndexDef],
16    new_fields: &[schema::FieldDef],
17    new_indexes: &[IndexDef],
18) -> Result<SchemaChange, DbError> {
19    // Build path->def maps.
20    let mut old_map: std::collections::HashMap<&schema::FieldPath, &schema::FieldDef> =
21        std::collections::HashMap::new();
22    for f in old_fields {
23        old_map.insert(&f.path, f);
24    }
25    let mut new_map: std::collections::HashMap<&schema::FieldPath, &schema::FieldDef> =
26        std::collections::HashMap::new();
27    for f in new_fields {
28        new_map.insert(&f.path, f);
29    }
30
31    // Existing fields must exist with compatible type and same constraints.
32    for (path, old_def) in &old_map {
33        let Some(new_def) = new_map.get(path) else {
34            return Ok(SchemaChange::Breaking {
35                reason: format!("field removed: {:?}", path.0),
36            });
37        };
38        if old_def.constraints != new_def.constraints {
39            return Ok(SchemaChange::Breaking {
40                reason: format!("constraints changed for field {:?}", path.0),
41            });
42        }
43        if !type_is_compatible(&old_def.ty, &new_def.ty) {
44            return Ok(SchemaChange::Breaking {
45                reason: format!("type changed for field {:?}", path.0),
46            });
47        }
48    }
49
50    // Field reorder breaks positional v2/v3 record encoding.
51    let old_existing: Vec<_> = old_fields
52        .iter()
53        .filter(|f| new_map.contains_key(&f.path))
54        .map(|f| &f.path)
55        .collect();
56    let new_existing: Vec<_> = new_fields
57        .iter()
58        .filter(|f| old_map.contains_key(&f.path))
59        .map(|f| &f.path)
60        .collect();
61    if old_existing != new_existing {
62        return Ok(SchemaChange::Breaking {
63            reason: "field reorder breaks positional record encoding".into(),
64        });
65    }
66
67    // New fields: safe only if optional-at-root; otherwise migration required.
68    for (path, new_def) in &new_map {
69        if old_map.contains_key(path) {
70            continue;
71        }
72        if validation::allows_absent_root(&new_def.ty) {
73            continue;
74        }
75        return Ok(SchemaChange::NeedsMigration {
76            reason: format!("new required field {:?} needs backfill", path.0),
77            backfill_top_level_field: if path.0.len() == 1 {
78                Some(path.0[0].to_string())
79            } else {
80                None
81            },
82            backfill_field_path: Some((*path).clone()),
83        });
84    }
85
86    // Index rules: existing indexes must remain identical.
87    let mut old_idx_map: std::collections::HashMap<&str, &IndexDef> =
88        std::collections::HashMap::new();
89    for idx in old_indexes {
90        old_idx_map.insert(idx.name.as_str(), idx);
91    }
92    let mut new_idx_map: std::collections::HashMap<&str, &IndexDef> =
93        std::collections::HashMap::new();
94    for idx in new_indexes {
95        new_idx_map.insert(idx.name.as_str(), idx);
96    }
97
98    for (name, old_idx) in &old_idx_map {
99        // Dropping an index is allowed: it only affects planning/unique enforcement going forward.
100        let Some(new_idx) = new_idx_map.get(name) else {
101            continue;
102        };
103        // Use `|` so both comparisons run; `||` leaves one side unevaluated and shows as a line miss under llvm-cov.
104        if (old_idx.kind != new_idx.kind) | (old_idx.path != new_idx.path) {
105            return Ok(SchemaChange::Breaking {
106                reason: format!("index changed: {name:?}"),
107            });
108        }
109    }
110
111    // Added indexes.
112    for (name, new_idx) in &new_idx_map {
113        if old_idx_map.contains_key(name) {
114            continue;
115        }
116        if new_idx.kind == IndexKind::Unique {
117            return Ok(SchemaChange::NeedsMigration {
118                reason: format!("new unique index {name:?} needs rebuild/validation"),
119                backfill_top_level_field: None,
120                backfill_field_path: None,
121            });
122        }
123    }
124
125    Ok(SchemaChange::Safe)
126}
127
128/// Validate that a model's declared fields (and optional indexes) are compatible with an
129/// existing collection catalog entry.
130///
131/// - Every model field must exist in the catalog with the same type.
132/// - Primary key name must match.
133/// - If the model declares the full field set, index definitions must match the catalog.
134/// - Subset models may omit catalog fields; index defs on subset models are optional but must
135///   match when present.
136pub fn validate_model_fields_against_catalog(
137    col: &crate::catalog::CollectionInfo,
138    primary_field: &str,
139    model_fields: &[schema::FieldDef],
140    model_indexes: &[IndexDef],
141) -> Result<(), DbError> {
142    let Some(pk) = col.primary_field.as_deref() else {
143        return Err(DbError::Schema(SchemaError::NoPrimaryKey {
144            collection_id: col.id.0,
145        }));
146    };
147    if pk != primary_field {
148        return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
149            name: primary_field.to_string(),
150        }));
151    }
152    for mf in model_fields {
153        let Some(cf) = col.fields.iter().find(|f| f.path == mf.path) else {
154            return Err(DbError::Schema(SchemaError::RowUnknownField {
155                name: mf.path.0.last().map(|s| s.to_string()).unwrap_or_default(),
156            }));
157        };
158        if cf.ty != mf.ty {
159            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
160        }
161    }
162
163    let model_paths: std::collections::BTreeSet<_> = model_fields.iter().map(|f| &f.path).collect();
164    let catalog_paths: std::collections::BTreeSet<_> = col.fields.iter().map(|f| &f.path).collect();
165    let is_full_schema = model_paths == catalog_paths;
166
167    if is_full_schema && !indexes_match(&col.indexes, model_indexes) {
168        return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
169            message: "model index definitions do not match collection catalog".into(),
170        }));
171    }
172    if !is_full_schema {
173        for mi in model_indexes {
174            let Some(ci) = col.indexes.iter().find(|i| i.name == mi.name) else {
175                return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
176                    message: format!("unknown index {:?}", mi.name),
177                }));
178            };
179            if ci.kind != mi.kind || ci.path != mi.path {
180                return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
181                    message: format!("index {:?} does not match catalog", mi.name),
182                }));
183            }
184        }
185    }
186    Ok(())
187}
188
189fn indexes_match(a: &[IndexDef], b: &[IndexDef]) -> bool {
190    if a.len() != b.len() {
191        return false;
192    }
193    a.iter().all(|ia| {
194        b.iter()
195            .any(|ib| ib.name == ia.name && ib.kind == ia.kind && ib.path == ia.path)
196    })
197}
198
199fn type_is_compatible(old: &Type, new: &Type) -> bool {
200    match (old, new) {
201        (Type::Enum(old_vars), Type::Enum(new_vars)) => {
202            // New must be a superset (no removals).
203            old_vars.iter().all(|v| new_vars.contains(v))
204        }
205        _ => old == new,
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use std::borrow::Cow;
213    use std::collections::BTreeMap;
214
215    use crate::catalog::CollectionInfo;
216    use crate::error::{DbError, SchemaError};
217    use crate::schema::SchemaVersion;
218    use crate::schema::{FieldDef, FieldPath, IndexDef, IndexKind, Type};
219    use crate::CollectionId;
220
221    fn field(name: &str, ty: Type) -> FieldDef {
222        FieldDef {
223            path: FieldPath(vec![Cow::Owned(name.to_string())]),
224            ty,
225            constraints: vec![],
226        }
227    }
228
229    fn path(segs: &[&str]) -> FieldPath {
230        FieldPath(segs.iter().map(|s| Cow::Owned((*s).to_string())).collect())
231    }
232
233    #[test]
234    fn validate_model_fields_rejects_full_schema_index_mismatch() {
235        let col = CollectionInfo {
236            id: CollectionId(1),
237            name: "t".into(),
238            current_version: SchemaVersion(1),
239            fields: vec![field("id", Type::Int64), field("x", Type::Int64)],
240            indexes: vec![IndexDef {
241                name: "x_idx".into(),
242                path: path(&["x"]),
243                kind: IndexKind::NonUnique,
244            }],
245            primary_field: Some("id".into()),
246            version_history: BTreeMap::new(),
247        };
248        let err = validate_model_fields_against_catalog(
249            &col,
250            "id",
251            &col.fields,
252            &[IndexDef {
253                name: "other".into(),
254                path: path(&["x"]),
255                kind: IndexKind::NonUnique,
256            }],
257        )
258        .unwrap_err();
259        assert!(matches!(
260            err,
261            DbError::Schema(SchemaError::IncompatibleSchemaChange { .. })
262        ));
263    }
264
265    #[test]
266    fn validate_model_fields_subset_index_errors() {
267        let col = CollectionInfo {
268            id: CollectionId(1),
269            name: "t".into(),
270            current_version: SchemaVersion(1),
271            fields: vec![field("id", Type::Int64), field("note", Type::String)],
272            indexes: vec![IndexDef {
273                name: "id_idx".into(),
274                path: path(&["id"]),
275                kind: IndexKind::NonUnique,
276            }],
277            primary_field: Some("id".into()),
278            version_history: BTreeMap::new(),
279        };
280        let subset = vec![field("id", Type::Int64)];
281
282        let err = validate_model_fields_against_catalog(
283            &col,
284            "id",
285            &subset,
286            &[IndexDef {
287                name: "missing".into(),
288                path: path(&["id"]),
289                kind: IndexKind::NonUnique,
290            }],
291        )
292        .unwrap_err();
293        assert!(matches!(
294            err,
295            DbError::Schema(SchemaError::IncompatibleSchemaChange { message }) if message.contains("unknown index")
296        ));
297
298        let err2 = validate_model_fields_against_catalog(
299            &col,
300            "id",
301            &subset,
302            &[IndexDef {
303                name: "id_idx".into(),
304                path: path(&["id"]),
305                kind: IndexKind::Unique,
306            }],
307        )
308        .unwrap_err();
309        assert!(matches!(
310            err2,
311            DbError::Schema(SchemaError::IncompatibleSchemaChange { message }) if message.contains("does not match catalog")
312        ));
313    }
314}