1use std::collections::BTreeMap;
4
5use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
6use crate::error::{DbError, SchemaError};
7use crate::index::{encode_index_payload, IndexEntry, IndexOp};
8use crate::record::RowValue;
9use crate::schema::CollectionId;
10use crate::schema::{classify_schema_update, FieldDef, SchemaChange, SchemaVersion};
11use crate::storage::Store;
12
13use super::{helpers, row_value_at_path_segments, scalar_at_path, Database};
14
15impl<S: Store> Database<S> {
16 pub fn register_model<T: crate::schema::DbModel>(
18 &mut self,
19 ) -> Result<(CollectionId, SchemaVersion), DbError> {
20 self.register_collection_with_indexes(
21 T::collection_name(),
22 T::fields(),
23 T::indexes(),
24 T::primary_field(),
25 )
26 }
27
28 pub fn register_collection(
33 &mut self,
34 name: &str,
35 fields: Vec<FieldDef>,
36 primary_field: &str,
37 ) -> Result<(CollectionId, SchemaVersion), DbError> {
38 self.register_collection_with_indexes(name, fields, vec![], primary_field)
39 }
40
41 pub fn register_collection_with_indexes(
42 &mut self,
43 name: &str,
44 fields: Vec<FieldDef>,
45 indexes: Vec<crate::schema::IndexDef>,
46 primary_field: &str,
47 ) -> Result<(CollectionId, SchemaVersion), DbError> {
48 let name = helpers::normalize_collection_name(name)?;
49 let pk = primary_field.trim();
50 if pk.is_empty() {
51 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
52 }
53 if !Catalog::has_top_level_field(&fields, pk) {
54 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
55 name: pk.to_string(),
56 }));
57 }
58 if let Some(st) = &mut self.txn_staging {
59 let id = st.shadow_catalog.next_collection_id().0;
60 let wire = CatalogRecordWire::CreateCollection {
61 collection_id: id,
62 name: name.clone(),
63 schema_version: 1,
64 fields,
65 indexes,
66 primary_field: Some(pk.to_string()),
67 };
68 let payload = encode_catalog_payload(&wire);
69 st.shadow_catalog.apply_record(wire)?;
70 st.pending
71 .push((crate::segments::header::SegmentType::Schema, payload));
72 return Ok((CollectionId(id), SchemaVersion(1)));
73 }
74 let id = self.catalog.next_collection_id().0;
75 let wire = CatalogRecordWire::CreateCollection {
76 collection_id: id,
77 name: name.clone(),
78 schema_version: 1,
79 fields,
80 indexes,
81 primary_field: Some(pk.to_string()),
82 };
83 let payload = encode_catalog_payload(&wire);
84 let tid = self.next_txn_id();
85 self.commit_write_batch(
86 tid,
87 &[(
88 crate::segments::header::SegmentType::Schema,
89 payload.as_slice(),
90 )],
91 )?;
92 self.apply_catalog_record(wire)?;
93 self.push_shared_mirror();
94 Ok((CollectionId(id), SchemaVersion(1)))
95 }
96
97 pub fn register_schema_version(
101 &mut self,
102 id: CollectionId,
103 fields: Vec<FieldDef>,
104 ) -> Result<SchemaVersion, DbError> {
105 self.register_schema_version_with_indexes(id, fields, vec![])
106 }
107
108 pub fn register_schema_version_with_indexes(
109 &mut self,
110 id: CollectionId,
111 fields: Vec<FieldDef>,
112 indexes: Vec<crate::schema::IndexDef>,
113 ) -> Result<SchemaVersion, DbError> {
114 let current = self
115 .catalog_for_read()
116 .get(id)
117 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
118 match classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)? {
120 SchemaChange::Safe => {}
121 SchemaChange::NeedsMigration { reason, .. } => {
122 return Err(DbError::Schema(SchemaError::MigrationRequired {
123 message: reason,
124 }));
125 }
126 SchemaChange::Breaking { reason } => {
127 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
128 message: reason,
129 }));
130 }
131 }
132 let next_v = current
133 .current_version
134 .0
135 .checked_add(1)
136 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
137 let wire = CatalogRecordWire::NewSchemaVersion {
138 collection_id: id.0,
139 schema_version: next_v,
140 fields,
141 indexes,
142 };
143 let payload = encode_catalog_payload(&wire);
144 if let Some(st) = &mut self.txn_staging {
145 st.shadow_catalog.apply_record(wire.clone())?;
146 st.pending
147 .push((crate::segments::header::SegmentType::Schema, payload));
148 self.rewrite_collection_rows_at_current_version(id)?;
149 return Ok(SchemaVersion(next_v));
150 }
151 self.begin_transaction()?;
152 if let Some(st) = &mut self.txn_staging {
153 st.shadow_catalog.apply_record(wire.clone())?;
154 st.pending
155 .push((crate::segments::header::SegmentType::Schema, payload));
156 }
157 self.rewrite_collection_rows_at_current_version(id)?;
158 self.commit_transaction()?;
159 Ok(SchemaVersion(next_v))
160 }
161
162 pub fn plan_schema_version_with_indexes(
164 &self,
165 id: CollectionId,
166 fields: Vec<FieldDef>,
167 indexes: Vec<crate::schema::IndexDef>,
168 ) -> Result<crate::MigrationPlan, DbError> {
169 let current = self
170 .catalog_for_read()
171 .get(id)
172 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
173 let change = classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)?;
175 let mut steps = Vec::new();
176 match &change {
177 SchemaChange::Safe => {}
178 SchemaChange::Breaking { .. } => {}
179 SchemaChange::NeedsMigration {
180 reason,
181 backfill_top_level_field,
182 backfill_field_path,
183 } => {
184 if let Some(field) = backfill_top_level_field {
185 steps.push(crate::MigrationStep::BackfillTopLevelField {
186 field: field.clone(),
187 });
188 } else if let Some(path) = backfill_field_path {
189 steps.push(crate::MigrationStep::BackfillFieldAtPath { path: path.clone() });
190 } else if reason.contains("unique index") {
191 steps.push(crate::MigrationStep::RebuildIndexes);
192 }
193 }
194 }
195 Ok(crate::MigrationPlan { change, steps })
196 }
197
198 pub fn backfill_top_level_field_with_value(
202 &mut self,
203 collection_id: CollectionId,
204 field: &str,
205 value: RowValue,
206 ) -> Result<(), DbError> {
207 let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
208 self.backfill_field_at_path_with_value(collection_id, &path, value)
209 }
210
211 pub fn backfill_field_at_path_with_value(
213 &mut self,
214 collection_id: CollectionId,
215 path: &crate::schema::FieldPath,
216 value: RowValue,
217 ) -> Result<(), DbError> {
218 let col = self
219 .catalog_for_read()
220 .get(collection_id)
221 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
222 id: collection_id.0,
223 }))?;
224 let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
225 DbError::Schema(SchemaError::RowUnknownField {
226 name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
227 })
228 })?;
229
230 let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
231 for ((cid, _), row) in self.latest_for_read().iter() {
232 if *cid != collection_id.0 {
233 continue;
234 }
235 rows.push(row.clone());
236 }
237
238 self.transaction(|db| {
239 for mut row in rows {
240 if row_value_at_path_segments(&row, &path.0).is_some() {
241 continue;
242 }
243 crate::record::insert_value_at_path(&mut row, path, value.clone())?;
244 db.insert(collection_id, row)?;
245 }
246 Ok(())
247 })
248 }
249
250 pub fn rebuild_indexes_for_collection(
252 &mut self,
253 collection_id: CollectionId,
254 ) -> Result<(), DbError> {
255 let col = self
256 .catalog_for_read()
257 .get(collection_id)
258 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
259 id: collection_id.0,
260 }))?;
261 let pk_name =
262 col.primary_field
263 .as_deref()
264 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
265 collection_id: collection_id.0,
266 }))?;
267 let pk_def = col
268 .fields
269 .iter()
270 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
271 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
272 name: pk_name.to_string(),
273 }))?;
274
275 let mut entries: Vec<IndexEntry> = Vec::new();
276 for e in self.indexes_for_read().entries_for_checkpoint() {
277 if e.collection_id != collection_id.0 {
278 continue;
279 }
280 entries.push(IndexEntry {
281 collection_id: e.collection_id,
282 index_name: e.index_name.clone(),
283 kind: e.kind,
284 op: IndexOp::Delete,
285 index_key: e.index_key.clone(),
286 pk_key: e.pk_key.clone(),
287 });
288 }
289 for ((cid, _), row) in self.latest_for_read().iter() {
290 if *cid != collection_id.0 {
291 continue;
292 }
293 let Some(pk_cell) = row.get(pk_name) else {
294 continue;
295 };
296 let pk_scalar = pk_cell.clone().into_scalar()?;
297 if !pk_scalar.ty_matches(&pk_def.ty) {
298 continue;
299 }
300 for idx in &col.indexes {
301 let Some(v) = scalar_at_path(row, &idx.path) else {
302 continue;
303 };
304 entries.push(IndexEntry {
305 collection_id: collection_id.0,
306 index_name: idx.name.clone(),
307 kind: idx.kind,
308 op: IndexOp::Insert,
309 index_key: v.canonical_key_bytes(),
310 pk_key: pk_scalar.canonical_key_bytes(),
311 });
312 }
313 }
314
315 self.transaction(|db| {
316 if entries.is_empty() {
317 return Ok(());
318 }
319 let st = db
322 .txn_staging
323 .as_mut()
324 .expect("transaction staging must be active");
325 let b = encode_index_payload(&entries);
326 st.pending
327 .push((crate::segments::header::SegmentType::Index, b));
328 for e in entries {
329 st.shadow_indexes.apply(e)?;
330 }
331 Ok(())
332 })
333 }
334
335 pub fn register_schema_version_with_indexes_force(
340 &mut self,
341 id: CollectionId,
342 fields: Vec<FieldDef>,
343 indexes: Vec<crate::schema::IndexDef>,
344 ) -> Result<SchemaVersion, DbError> {
345 let current = self
346 .catalog_for_read()
347 .get(id)
348 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
349 let next_v = current
350 .current_version
351 .0
352 .checked_add(1)
353 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
354 let wire = CatalogRecordWire::NewSchemaVersion {
355 collection_id: id.0,
356 schema_version: next_v,
357 fields,
358 indexes,
359 };
360 let payload = encode_catalog_payload(&wire);
361 if let Some(st) = &mut self.txn_staging {
362 st.shadow_catalog.apply_record(wire.clone())?;
363 st.pending
364 .push((crate::segments::header::SegmentType::Schema, payload));
365 return Ok(SchemaVersion(next_v));
366 }
367 let tid = self.next_txn_id();
368 self.commit_write_batch(
369 tid,
370 &[(
371 crate::segments::header::SegmentType::Schema,
372 payload.as_slice(),
373 )],
374 )?;
375 self.apply_catalog_record(wire)?;
376 self.push_shared_mirror();
377 Ok(SchemaVersion(next_v))
378 }
379
380 pub(crate) fn rewrite_collection_rows_at_current_version(
381 &mut self,
382 collection_id: CollectionId,
383 ) -> Result<(), DbError> {
384 let rows: Vec<BTreeMap<String, RowValue>> = self
385 .latest_for_read()
386 .iter()
387 .filter(|((cid, _), _)| *cid == collection_id.0)
388 .map(|(_, row)| row.clone())
389 .collect();
390 for row in rows {
391 self.insert(collection_id, row)?;
392 }
393 Ok(())
394 }
395}