Skip to main content

modelvault_core/db/
catalog_ops.rs

1//! Catalog registration, schema migration planning, and index rebuild helpers.
2
3use std::collections::BTreeMap;
4
5use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
6use crate::error::{DbError, SchemaError};
7use crate::index::{encode_index_payload, IndexEntry, IndexOp};
8use crate::record::RowValue;
9use crate::schema::CollectionId;
10use crate::schema::{classify_schema_update, FieldDef, SchemaChange, SchemaVersion};
11use crate::storage::Store;
12
13use super::{helpers, row_value_at_path_segments, scalar_at_path, Database};
14
15impl<S: Store> Database<S> {
16    /// Register the collection schema defined by `T` (schema version 1).
17    pub fn register_model<T: crate::schema::DbModel>(
18        &mut self,
19    ) -> Result<(CollectionId, SchemaVersion), DbError> {
20        self.register_collection_with_indexes(
21            T::collection_name(),
22            T::fields(),
23            T::indexes(),
24            T::primary_field(),
25        )
26    }
27
28    /// Create a new collection at schema version `1`.
29    ///
30    /// `primary_field` must name a **single-segment** (top-level) field present in `fields`.
31    /// Appends a catalog segment and updates the in-memory catalog.
32    pub fn register_collection(
33        &mut self,
34        name: &str,
35        fields: Vec<FieldDef>,
36        primary_field: &str,
37    ) -> Result<(CollectionId, SchemaVersion), DbError> {
38        self.register_collection_with_indexes(name, fields, vec![], primary_field)
39    }
40
41    pub fn register_collection_with_indexes(
42        &mut self,
43        name: &str,
44        fields: Vec<FieldDef>,
45        indexes: Vec<crate::schema::IndexDef>,
46        primary_field: &str,
47    ) -> Result<(CollectionId, SchemaVersion), DbError> {
48        let name = helpers::normalize_collection_name(name)?;
49        let pk = primary_field.trim();
50        if pk.is_empty() {
51            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
52        }
53        if !Catalog::has_top_level_field(&fields, pk) {
54            return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
55                name: pk.to_string(),
56            }));
57        }
58        if let Some(st) = &mut self.txn_staging {
59            let id = st.shadow_catalog.next_collection_id().0;
60            let wire = CatalogRecordWire::CreateCollection {
61                collection_id: id,
62                name: name.clone(),
63                schema_version: 1,
64                fields,
65                indexes,
66                primary_field: Some(pk.to_string()),
67            };
68            let payload = encode_catalog_payload(&wire);
69            st.shadow_catalog.apply_record(wire)?;
70            st.pending
71                .push((crate::segments::header::SegmentType::Schema, payload));
72            return Ok((CollectionId(id), SchemaVersion(1)));
73        }
74        let id = self.catalog.next_collection_id().0;
75        let wire = CatalogRecordWire::CreateCollection {
76            collection_id: id,
77            name: name.clone(),
78            schema_version: 1,
79            fields,
80            indexes,
81            primary_field: Some(pk.to_string()),
82        };
83        let payload = encode_catalog_payload(&wire);
84        let tid = self.next_txn_id();
85        self.commit_write_batch(
86            tid,
87            &[(
88                crate::segments::header::SegmentType::Schema,
89                payload.as_slice(),
90            )],
91        )?;
92        self.apply_catalog_record(wire)?;
93        self.push_shared_mirror();
94        Ok((CollectionId(id), SchemaVersion(1)))
95    }
96
97    /// Bump the schema version for `id` to `current + 1` with a new field set.
98    ///
99    /// The primary-key field must remain present as a top-level field (see catalog rules).
100    pub fn register_schema_version(
101        &mut self,
102        id: CollectionId,
103        fields: Vec<FieldDef>,
104    ) -> Result<SchemaVersion, DbError> {
105        self.register_schema_version_with_indexes(id, fields, vec![])
106    }
107
108    pub fn register_schema_version_with_indexes(
109        &mut self,
110        id: CollectionId,
111        fields: Vec<FieldDef>,
112        indexes: Vec<crate::schema::IndexDef>,
113    ) -> Result<SchemaVersion, DbError> {
114        let current = self
115            .catalog_for_read()
116            .get(id)
117            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
118        // `classify_schema_update` only returns `Ok(...)` variants today; keep it infallible here.
119        match classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)? {
120            SchemaChange::Safe => {}
121            SchemaChange::NeedsMigration { reason, .. } => {
122                return Err(DbError::Schema(SchemaError::MigrationRequired {
123                    message: reason,
124                }));
125            }
126            SchemaChange::Breaking { reason } => {
127                return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
128                    message: reason,
129                }));
130            }
131        }
132        let next_v = current
133            .current_version
134            .0
135            .checked_add(1)
136            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
137        let wire = CatalogRecordWire::NewSchemaVersion {
138            collection_id: id.0,
139            schema_version: next_v,
140            fields,
141            indexes,
142        };
143        let payload = encode_catalog_payload(&wire);
144        if let Some(st) = &mut self.txn_staging {
145            st.shadow_catalog.apply_record(wire.clone())?;
146            st.pending
147                .push((crate::segments::header::SegmentType::Schema, payload));
148            self.rewrite_collection_rows_at_current_version(id)?;
149            return Ok(SchemaVersion(next_v));
150        }
151        self.begin_transaction()?;
152        if let Some(st) = &mut self.txn_staging {
153            st.shadow_catalog.apply_record(wire.clone())?;
154            st.pending
155                .push((crate::segments::header::SegmentType::Schema, payload));
156        }
157        self.rewrite_collection_rows_at_current_version(id)?;
158        self.commit_transaction()?;
159        Ok(SchemaVersion(next_v))
160    }
161
162    /// Plan a schema version bump and return the required migration steps, if any.
163    pub fn plan_schema_version_with_indexes(
164        &self,
165        id: CollectionId,
166        fields: Vec<FieldDef>,
167        indexes: Vec<crate::schema::IndexDef>,
168    ) -> Result<crate::MigrationPlan, DbError> {
169        let current = self
170            .catalog_for_read()
171            .get(id)
172            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
173        // Same infallibility contract as `register_schema_version_with_indexes` above.
174        let change = classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)?;
175        let mut steps = Vec::new();
176        match &change {
177            SchemaChange::Safe => {}
178            SchemaChange::Breaking { .. } => {}
179            SchemaChange::NeedsMigration {
180                reason,
181                backfill_top_level_field,
182                backfill_field_path,
183            } => {
184                if let Some(field) = backfill_top_level_field {
185                    steps.push(crate::MigrationStep::BackfillTopLevelField {
186                        field: field.clone(),
187                    });
188                } else if let Some(path) = backfill_field_path {
189                    steps.push(crate::MigrationStep::BackfillFieldAtPath { path: path.clone() });
190                } else if reason.contains("unique index") {
191                    steps.push(crate::MigrationStep::RebuildIndexes);
192                }
193            }
194        }
195        Ok(crate::MigrationPlan { change, steps })
196    }
197
198    /// Backfill a missing top-level field with a fixed value for all rows in a collection.
199    ///
200    /// This helper is intentionally simple so it can be bound to other languages.
201    pub fn backfill_top_level_field_with_value(
202        &mut self,
203        collection_id: CollectionId,
204        field: &str,
205        value: RowValue,
206    ) -> Result<(), DbError> {
207        let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
208        self.backfill_field_at_path_with_value(collection_id, &path, value)
209    }
210
211    /// Backfill a missing field (any segment path) with a fixed value for all rows.
212    pub fn backfill_field_at_path_with_value(
213        &mut self,
214        collection_id: CollectionId,
215        path: &crate::schema::FieldPath,
216        value: RowValue,
217    ) -> Result<(), DbError> {
218        let col = self
219            .catalog_for_read()
220            .get(collection_id)
221            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
222                id: collection_id.0,
223            }))?;
224        let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
225            DbError::Schema(SchemaError::RowUnknownField {
226                name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
227            })
228        })?;
229
230        let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
231        for ((cid, _), row) in self.latest_for_read().iter() {
232            if *cid != collection_id.0 {
233                continue;
234            }
235            rows.push(row.clone());
236        }
237
238        self.transaction(|db| {
239            for mut row in rows {
240                if row_value_at_path_segments(&row, &path.0).is_some() {
241                    continue;
242                }
243                crate::record::insert_value_at_path(&mut row, path, value.clone())?;
244                db.insert(collection_id, row)?;
245            }
246            Ok(())
247        })
248    }
249
250    /// Rebuild index entries for all rows in `collection_id` using the current schema’s index defs.
251    pub fn rebuild_indexes_for_collection(
252        &mut self,
253        collection_id: CollectionId,
254    ) -> Result<(), DbError> {
255        let col = self
256            .catalog_for_read()
257            .get(collection_id)
258            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
259                id: collection_id.0,
260            }))?;
261        let pk_name =
262            col.primary_field
263                .as_deref()
264                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
265                    collection_id: collection_id.0,
266                }))?;
267        let pk_def = col
268            .fields
269            .iter()
270            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
271            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
272                name: pk_name.to_string(),
273            }))?;
274
275        let mut entries: Vec<IndexEntry> = Vec::new();
276        for e in self.indexes_for_read().entries_for_checkpoint() {
277            if e.collection_id != collection_id.0 {
278                continue;
279            }
280            entries.push(IndexEntry {
281                collection_id: e.collection_id,
282                index_name: e.index_name.clone(),
283                kind: e.kind,
284                op: IndexOp::Delete,
285                index_key: e.index_key.clone(),
286                pk_key: e.pk_key.clone(),
287            });
288        }
289        for ((cid, _), row) in self.latest_for_read().iter() {
290            if *cid != collection_id.0 {
291                continue;
292            }
293            let Some(pk_cell) = row.get(pk_name) else {
294                continue;
295            };
296            let pk_scalar = pk_cell.clone().into_scalar()?;
297            if !pk_scalar.ty_matches(&pk_def.ty) {
298                continue;
299            }
300            for idx in &col.indexes {
301                let Some(v) = scalar_at_path(row, &idx.path) else {
302                    continue;
303                };
304                entries.push(IndexEntry {
305                    collection_id: collection_id.0,
306                    index_name: idx.name.clone(),
307                    kind: idx.kind,
308                    op: IndexOp::Insert,
309                    index_key: v.canonical_key_bytes(),
310                    pk_key: pk_scalar.canonical_key_bytes(),
311                });
312            }
313        }
314
315        self.transaction(|db| {
316            if entries.is_empty() {
317                return Ok(());
318            }
319            // Apply in-memory + persist as one index segment batch.
320            // `begin_transaction` always installs `txn_staging` before this closure runs.
321            let st = db
322                .txn_staging
323                .as_mut()
324                .expect("transaction staging must be active");
325            let b = encode_index_payload(&entries);
326            st.pending
327                .push((crate::segments::header::SegmentType::Index, b));
328            for e in entries {
329                st.shadow_indexes.apply(e)?;
330            }
331            Ok(())
332        })
333    }
334
335    /// Force-register a new schema version, bypassing compatibility checks.
336    ///
337    /// This is an escape hatch for advanced workflows where the caller performs an out-of-band
338    /// data rewrite (or accepts inconsistent index/query behavior until a rebuild).
339    pub fn register_schema_version_with_indexes_force(
340        &mut self,
341        id: CollectionId,
342        fields: Vec<FieldDef>,
343        indexes: Vec<crate::schema::IndexDef>,
344    ) -> Result<SchemaVersion, DbError> {
345        let current = self
346            .catalog_for_read()
347            .get(id)
348            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
349        let next_v = current
350            .current_version
351            .0
352            .checked_add(1)
353            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
354        let wire = CatalogRecordWire::NewSchemaVersion {
355            collection_id: id.0,
356            schema_version: next_v,
357            fields,
358            indexes,
359        };
360        let payload = encode_catalog_payload(&wire);
361        if let Some(st) = &mut self.txn_staging {
362            st.shadow_catalog.apply_record(wire.clone())?;
363            st.pending
364                .push((crate::segments::header::SegmentType::Schema, payload));
365            return Ok(SchemaVersion(next_v));
366        }
367        let tid = self.next_txn_id();
368        self.commit_write_batch(
369            tid,
370            &[(
371                crate::segments::header::SegmentType::Schema,
372                payload.as_slice(),
373            )],
374        )?;
375        self.apply_catalog_record(wire)?;
376        self.push_shared_mirror();
377        Ok(SchemaVersion(next_v))
378    }
379
380    pub(crate) fn rewrite_collection_rows_at_current_version(
381        &mut self,
382        collection_id: CollectionId,
383    ) -> Result<(), DbError> {
384        let rows: Vec<BTreeMap<String, RowValue>> = self
385            .latest_for_read()
386            .iter()
387            .filter(|((cid, _), _)| *cid == collection_id.0)
388            .map(|(_, row)| row.clone())
389            .collect();
390        for row in rows {
391            self.insert(collection_id, row)?;
392        }
393        Ok(())
394    }
395}