1use crate::error::{DbError, FormatError, SchemaError};
2use crate::schema::{IndexDef, IndexKind, SchemaChange, Type};
3use crate::{schema, validation};
4
5pub fn classify_schema_update(
14 old_fields: &[schema::FieldDef],
15 old_indexes: &[IndexDef],
16 new_fields: &[schema::FieldDef],
17 new_indexes: &[IndexDef],
18) -> Result<SchemaChange, DbError> {
19 let mut old_map: std::collections::HashMap<&schema::FieldPath, &schema::FieldDef> =
21 std::collections::HashMap::new();
22 for f in old_fields {
23 old_map.insert(&f.path, f);
24 }
25 let mut new_map: std::collections::HashMap<&schema::FieldPath, &schema::FieldDef> =
26 std::collections::HashMap::new();
27 for f in new_fields {
28 new_map.insert(&f.path, f);
29 }
30
31 for (path, old_def) in &old_map {
33 let Some(new_def) = new_map.get(path) else {
34 return Ok(SchemaChange::Breaking {
35 reason: format!("field removed: {:?}", path.0),
36 });
37 };
38 if old_def.constraints != new_def.constraints {
39 return Ok(SchemaChange::Breaking {
40 reason: format!("constraints changed for field {:?}", path.0),
41 });
42 }
43 if !type_is_compatible(&old_def.ty, &new_def.ty) {
44 return Ok(SchemaChange::Breaking {
45 reason: format!("type changed for field {:?}", path.0),
46 });
47 }
48 }
49
50 let old_existing: Vec<_> = old_fields
52 .iter()
53 .filter(|f| new_map.contains_key(&f.path))
54 .map(|f| &f.path)
55 .collect();
56 let new_existing: Vec<_> = new_fields
57 .iter()
58 .filter(|f| old_map.contains_key(&f.path))
59 .map(|f| &f.path)
60 .collect();
61 if old_existing != new_existing {
62 return Ok(SchemaChange::Breaking {
63 reason: "field reorder breaks positional record encoding".into(),
64 });
65 }
66
67 for (path, new_def) in &new_map {
69 if old_map.contains_key(path) {
70 continue;
71 }
72 if validation::allows_absent_root(&new_def.ty) {
73 continue;
74 }
75 return Ok(SchemaChange::NeedsMigration {
76 reason: format!("new required field {:?} needs backfill", path.0),
77 backfill_top_level_field: if path.0.len() == 1 {
78 Some(path.0[0].to_string())
79 } else {
80 None
81 },
82 backfill_field_path: Some((*path).clone()),
83 });
84 }
85
86 let mut old_idx_map: std::collections::HashMap<&str, &IndexDef> =
88 std::collections::HashMap::new();
89 for idx in old_indexes {
90 old_idx_map.insert(idx.name.as_str(), idx);
91 }
92 let mut new_idx_map: std::collections::HashMap<&str, &IndexDef> =
93 std::collections::HashMap::new();
94 for idx in new_indexes {
95 new_idx_map.insert(idx.name.as_str(), idx);
96 }
97
98 for (name, old_idx) in &old_idx_map {
99 let Some(new_idx) = new_idx_map.get(name) else {
101 continue;
102 };
103 if (old_idx.kind != new_idx.kind) | (old_idx.path != new_idx.path) {
105 return Ok(SchemaChange::Breaking {
106 reason: format!("index changed: {name:?}"),
107 });
108 }
109 }
110
111 for (name, new_idx) in &new_idx_map {
113 if old_idx_map.contains_key(name) {
114 continue;
115 }
116 if new_idx.kind == IndexKind::Unique {
117 return Ok(SchemaChange::NeedsMigration {
118 reason: format!("new unique index {name:?} needs rebuild/validation"),
119 backfill_top_level_field: None,
120 backfill_field_path: None,
121 });
122 }
123 }
124
125 Ok(SchemaChange::Safe)
126}
127
128pub fn validate_model_fields_against_catalog(
137 col: &crate::catalog::CollectionInfo,
138 primary_field: &str,
139 model_fields: &[schema::FieldDef],
140 model_indexes: &[IndexDef],
141) -> Result<(), DbError> {
142 let Some(pk) = col.primary_field.as_deref() else {
143 return Err(DbError::Schema(SchemaError::NoPrimaryKey {
144 collection_id: col.id.0,
145 }));
146 };
147 if pk != primary_field {
148 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
149 name: primary_field.to_string(),
150 }));
151 }
152 for mf in model_fields {
153 let Some(cf) = col.fields.iter().find(|f| f.path == mf.path) else {
154 return Err(DbError::Schema(SchemaError::RowUnknownField {
155 name: mf.path.0.last().map(|s| s.to_string()).unwrap_or_default(),
156 }));
157 };
158 if cf.ty != mf.ty {
159 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
160 }
161 }
162
163 let model_paths: std::collections::BTreeSet<_> = model_fields.iter().map(|f| &f.path).collect();
164 let catalog_paths: std::collections::BTreeSet<_> = col.fields.iter().map(|f| &f.path).collect();
165 let is_full_schema = model_paths == catalog_paths;
166
167 if is_full_schema && !indexes_match(&col.indexes, model_indexes) {
168 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
169 message: "model index definitions do not match collection catalog".into(),
170 }));
171 }
172 if !is_full_schema {
173 for mi in model_indexes {
174 let Some(ci) = col.indexes.iter().find(|i| i.name == mi.name) else {
175 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
176 message: format!("unknown index {:?}", mi.name),
177 }));
178 };
179 if ci.kind != mi.kind || ci.path != mi.path {
180 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
181 message: format!("index {:?} does not match catalog", mi.name),
182 }));
183 }
184 }
185 }
186 Ok(())
187}
188
189fn indexes_match(a: &[IndexDef], b: &[IndexDef]) -> bool {
190 if a.len() != b.len() {
191 return false;
192 }
193 a.iter().all(|ia| {
194 b.iter()
195 .any(|ib| ib.name == ia.name && ib.kind == ia.kind && ib.path == ia.path)
196 })
197}
198
199fn type_is_compatible(old: &Type, new: &Type) -> bool {
200 match (old, new) {
201 (Type::Enum(old_vars), Type::Enum(new_vars)) => {
202 old_vars.iter().all(|v| new_vars.contains(v))
204 }
205 _ => old == new,
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use std::borrow::Cow;
213 use std::collections::BTreeMap;
214
215 use crate::catalog::CollectionInfo;
216 use crate::error::{DbError, SchemaError};
217 use crate::schema::SchemaVersion;
218 use crate::schema::{FieldDef, FieldPath, IndexDef, IndexKind, Type};
219 use crate::CollectionId;
220
221 fn field(name: &str, ty: Type) -> FieldDef {
222 FieldDef {
223 path: FieldPath(vec![Cow::Owned(name.to_string())]),
224 ty,
225 constraints: vec![],
226 }
227 }
228
229 fn path(segs: &[&str]) -> FieldPath {
230 FieldPath(segs.iter().map(|s| Cow::Owned((*s).to_string())).collect())
231 }
232
233 #[test]
234 fn validate_model_fields_rejects_full_schema_index_mismatch() {
235 let col = CollectionInfo {
236 id: CollectionId(1),
237 name: "t".into(),
238 current_version: SchemaVersion(1),
239 fields: vec![field("id", Type::Int64), field("x", Type::Int64)],
240 indexes: vec![IndexDef {
241 name: "x_idx".into(),
242 path: path(&["x"]),
243 kind: IndexKind::NonUnique,
244 }],
245 primary_field: Some("id".into()),
246 version_history: BTreeMap::new(),
247 };
248 let err = validate_model_fields_against_catalog(
249 &col,
250 "id",
251 &col.fields,
252 &[IndexDef {
253 name: "other".into(),
254 path: path(&["x"]),
255 kind: IndexKind::NonUnique,
256 }],
257 )
258 .unwrap_err();
259 assert!(matches!(
260 err,
261 DbError::Schema(SchemaError::IncompatibleSchemaChange { .. })
262 ));
263 }
264
265 #[test]
266 fn validate_model_fields_subset_index_errors() {
267 let col = CollectionInfo {
268 id: CollectionId(1),
269 name: "t".into(),
270 current_version: SchemaVersion(1),
271 fields: vec![field("id", Type::Int64), field("note", Type::String)],
272 indexes: vec![IndexDef {
273 name: "id_idx".into(),
274 path: path(&["id"]),
275 kind: IndexKind::NonUnique,
276 }],
277 primary_field: Some("id".into()),
278 version_history: BTreeMap::new(),
279 };
280 let subset = vec![field("id", Type::Int64)];
281
282 let err = validate_model_fields_against_catalog(
283 &col,
284 "id",
285 &subset,
286 &[IndexDef {
287 name: "missing".into(),
288 path: path(&["id"]),
289 kind: IndexKind::NonUnique,
290 }],
291 )
292 .unwrap_err();
293 assert!(matches!(
294 err,
295 DbError::Schema(SchemaError::IncompatibleSchemaChange { message }) if message.contains("unknown index")
296 ));
297
298 let err2 = validate_model_fields_against_catalog(
299 &col,
300 "id",
301 &subset,
302 &[IndexDef {
303 name: "id_idx".into(),
304 path: path(&["id"]),
305 kind: IndexKind::Unique,
306 }],
307 )
308 .unwrap_err();
309 assert!(matches!(
310 err2,
311 DbError::Schema(SchemaError::IncompatibleSchemaChange { message }) if message.contains("does not match catalog")
312 ));
313 }
314}