1mod fs_ops;
7mod helpers;
8mod open;
9mod recover;
10mod replay;
11pub(crate) mod row_materialize;
12mod writer_registry;
13pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
14mod row_merge;
15pub(crate) mod row_paths;
16pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
17mod write;
18
19use std::collections::{BTreeMap, HashMap};
20use std::marker::PhantomData;
21use std::path::{Path, PathBuf};
22
23use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
24use crate::config::{OpenMode, OpenOptions};
25use crate::error::{DbError, FormatError, SchemaError, TransactionError};
26use crate::index::IndexState;
27use crate::index::{encode_index_payload, IndexEntry, IndexOp};
28use crate::record::{
29 encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
30 encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
31 OP_REPLACE,
32};
33use crate::schema::{classify_schema_update, SchemaChange};
34use crate::schema::{CollectionId, FieldDef, SchemaVersion};
35use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
36use crate::segments::writer::SegmentWriter;
37use crate::storage::{FileStore, Store, VecStore};
38use crate::validation;
39use crate::{checkpoint, publish};
40use crate::{MigrationPlan, MigrationStep};
41
42use self::fs_ops::{FsOps, StdFsOps};
43
44#[cfg(unix)]
46fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
47 let Some(parent) = dest_path.parent() else {
48 return;
49 };
50 let Ok(dir_f) = fs.open_dir(parent) else {
51 return;
52 };
53 let _ = dir_f.sync_all();
54}
55
56pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
57
58type PlannedInsert = (
59 Vec<u8>,
60 (Vec<u8>, BTreeMap<String, RowValue>),
61 Vec<IndexEntry>,
62 ScalarValue,
63);
64
65fn plan_insert_row(
66 catalog: &Catalog,
67 collection_id: CollectionId,
68 mut row: BTreeMap<String, RowValue>,
69) -> Result<PlannedInsert, DbError> {
70 let col =
71 catalog
72 .get(collection_id)
73 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
74 id: collection_id.0,
75 }))?;
76 let pk_name =
77 col.primary_field
78 .as_deref()
79 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
80 collection_id: collection_id.0,
81 }))?;
82 let pk_def = col
83 .fields
84 .iter()
85 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
86 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
87 name: pk_name.to_string(),
88 }))?;
89 let pk_ty = &pk_def.ty;
90 validation::ensure_pk_type_primitive(pk_ty)?;
91 let mut pk_path = vec![pk_name.to_string()];
92 let pk_cell = row
93 .get(pk_name)
94 .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
95 name: pk_name.to_string(),
96 }))?;
97 validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
98 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
101 if !has_multi_segment_schema {
102 validation::validate_top_level_row(&col.fields, pk_name, &row)?;
103 } else {
104 validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
105 }
106
107 let pk_val = row.remove(pk_name).unwrap();
109 let pk_scalar = pk_val.clone().into_scalar()?;
110
111 let non_pk_defs = if has_multi_segment_schema {
115 col.fields
116 .iter()
117 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
118 .collect::<Vec<_>>()
119 } else {
120 non_pk_defs_in_order(&col.fields, pk_name)
121 };
122 let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
123
124 let payload = if has_multi_segment_schema {
125 encode_record_payload_v3(
126 collection_id.0,
127 col.current_version.0,
128 &pk_scalar,
129 pk_ty,
130 &non_pk,
131 )?
132 } else {
133 encode_record_payload_v2(
134 collection_id.0,
135 col.current_version.0,
136 &pk_scalar,
137 pk_ty,
138 &non_pk,
139 )?
140 };
141
142 let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
143 full_map.insert(pk_name.to_string(), pk_val);
144 for (def, v) in &non_pk {
145 let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
146 if parts.len() == 1 {
147 full_map.insert(parts[0].clone(), v.clone());
148 } else {
149 debug_assert!(parts.len() >= 2);
150 row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
151 }
152 }
153 let mut index_entries: Vec<IndexEntry> = Vec::new();
154 for idx in &col.indexes {
155 let Some(v) = scalar_at_path(&full_map, &idx.path) else {
156 continue;
157 };
158 index_entries.push(IndexEntry {
159 collection_id: collection_id.0,
160 index_name: idx.name.clone(),
161 kind: idx.kind,
162 op: IndexOp::Insert,
163 index_key: v.canonical_key_bytes(),
164 pk_key: pk_scalar.canonical_key_bytes(),
165 });
166 }
167 let pk_key = pk_scalar.canonical_key_bytes();
168 Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
169}
170
171fn index_deletes_for_existing_row(
172 collection_id: CollectionId,
173 pk_scalar: &ScalarValue,
174 indexes: &[crate::schema::IndexDef],
175 existing_row: &BTreeMap<String, RowValue>,
176) -> Vec<IndexEntry> {
177 let mut out = Vec::new();
178 for idx in indexes {
179 let Some(v) = scalar_at_path(existing_row, &idx.path) else {
180 continue;
181 };
182 out.push(IndexEntry {
183 collection_id: collection_id.0,
184 index_name: idx.name.clone(),
185 kind: idx.kind,
186 op: IndexOp::Delete,
187 index_key: v.canonical_key_bytes(),
188 pk_key: pk_scalar.canonical_key_bytes(),
189 });
190 }
191 out
192}
193
194pub(crate) struct TxnStaging {
196 pub(crate) txn_id: u64,
197 pub(crate) shadow_catalog: Catalog,
198 pub(crate) shadow_latest: LatestMap,
199 pub(crate) shadow_indexes: IndexState,
200 pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
201}
202
203pub struct Database<S: Store = FileStore> {
205 path: PathBuf,
207 store: S,
208 catalog: Catalog,
210 segment_start: u64,
212 format_minor: u16,
214 latest: LatestMap,
216 indexes: IndexState,
218 txn_seq: u64,
220 txn_staging: Option<TxnStaging>,
222 #[allow(dead_code)]
224 writer_registry: Option<writer_registry::WriterRegistryGuard>,
225 #[cfg(test)]
227 #[doc(hidden)]
228 #[allow(clippy::type_complexity)]
229 pub(crate) test_poison_planned_replace_row:
230 Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
231 #[cfg(test)]
233 #[doc(hidden)]
234 pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
235}
236
237impl<S: Store> Database<S> {
238 fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
239 let mut out = Database::<VecStore>::open_in_memory()?;
240
241 let mut cols = self.catalog_for_read().collections();
243 cols.sort_by_key(|c| c.id.0);
244 for c in &cols {
245 let pk =
246 c.primary_field
247 .as_deref()
248 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
249 collection_id: c.id.0,
250 }))?;
251 let (new_id, _v1) = out.register_collection_with_indexes(
252 &c.name,
253 c.fields.clone(),
254 c.indexes.clone(),
255 pk,
256 )?;
257 for _ in 2..=c.current_version.0 {
259 let _ = out.register_schema_version_with_indexes_force(
260 new_id,
261 c.fields.clone(),
262 c.indexes.clone(),
263 )?;
264 }
265 }
266
267 for ((cid, _), row) in self.latest_for_read().iter() {
269 let collection_id = CollectionId(*cid);
270 out.insert(collection_id, row.clone())?;
271 }
272
273 Ok(out.into_snapshot_bytes())
274 }
275
276 pub(crate) fn open_with_store(
277 path: PathBuf,
278 store: S,
279 opts: OpenOptions,
280 ) -> Result<Self, DbError> {
281 open::open_with_store(path, store, opts)
282 }
283
284 fn next_txn_id(&mut self) -> u64 {
285 self.txn_seq = self.txn_seq.saturating_add(1);
286 self.txn_seq
287 }
288
289 #[inline]
290 fn commit_write_batch(
291 &mut self,
292 txn_id: u64,
293 body: &[(crate::segments::header::SegmentType, &[u8])],
294 ) -> Result<(), DbError> {
295 write::commit_write_txn_v6(
296 &mut self.store,
297 self.segment_start,
298 &mut self.format_minor,
299 txn_id,
300 body,
301 )
302 }
303
304 #[inline]
305 fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
306 self.catalog.apply_record(wire)
307 }
308
309 pub fn transaction<R>(
313 &mut self,
314 f: impl FnOnce(&mut Self) -> Result<R, DbError>,
315 ) -> Result<R, DbError> {
316 self.begin_transaction()?;
317 match f(self) {
318 Ok(v) => match self.commit_transaction() {
319 Ok(()) => Ok(v),
320 Err(e) => {
321 self.rollback_transaction();
322 Err(e)
323 }
324 },
325 Err(e) => {
326 self.rollback_transaction();
327 Err(e)
328 }
329 }
330 }
331
332 pub fn begin_transaction(&mut self) -> Result<(), DbError> {
335 if self.txn_staging.is_some() {
336 return Err(DbError::Transaction(TransactionError::NestedTransaction));
337 }
338 let tid = self.next_txn_id();
339 self.txn_staging = Some(TxnStaging {
340 txn_id: tid,
341 shadow_catalog: self.catalog.clone(),
342 shadow_latest: self.latest.clone(),
343 shadow_indexes: self.indexes.clone(),
344 pending: Vec::new(),
345 });
346 Ok(())
347 }
348
349 pub fn commit_transaction(&mut self) -> Result<(), DbError> {
351 self.commit_txn_staging()
352 }
353
354 pub fn rollback_transaction(&mut self) {
356 self.txn_staging = None;
357 }
358
359 fn commit_txn_staging(&mut self) -> Result<(), DbError> {
360 let Some(st) = self.txn_staging.take() else {
361 return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
362 };
363 if st.pending.is_empty() {
364 self.catalog = st.shadow_catalog;
365 self.latest = st.shadow_latest;
366 self.indexes = st.shadow_indexes;
367 return Ok(());
368 }
369 let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
370 st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
371 self.commit_write_batch(st.txn_id, &batch)?;
372 self.catalog = st.shadow_catalog;
373 self.latest = st.shadow_latest;
374 self.indexes = st.shadow_indexes;
375 Ok(())
376 }
377
378 fn catalog_for_read(&self) -> &Catalog {
379 if let Some(ref st) = self.txn_staging {
380 &st.shadow_catalog
381 } else {
382 &self.catalog
383 }
384 }
385
386 fn indexes_for_read(&self) -> &IndexState {
387 if let Some(ref st) = self.txn_staging {
388 &st.shadow_indexes
389 } else {
390 &self.indexes
391 }
392 }
393
394 fn latest_for_read(&self) -> &LatestMap {
395 if let Some(ref st) = self.txn_staging {
396 &st.shadow_latest
397 } else {
398 &self.latest
399 }
400 }
401
402 pub fn path(&self) -> &Path {
404 &self.path
405 }
406
407 pub fn catalog(&self) -> &Catalog {
409 self.catalog_for_read()
410 }
411
412 pub fn collection_names(&self) -> Vec<String> {
414 self.catalog_for_read().collection_names()
415 }
416
417 pub fn index_state(&self) -> &IndexState {
419 self.indexes_for_read()
420 }
421
422 pub fn query(
424 &self,
425 q: &crate::query::Query,
426 ) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
427 crate::query::execute_query(
428 self.catalog_for_read(),
429 self.indexes_for_read(),
430 self.latest_for_read(),
431 q,
432 )
433 }
434
435 pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
437 crate::query::explain_query(self.catalog_for_read(), q)
438 }
439
440 pub fn query_iter(
445 &self,
446 q: &crate::query::Query,
447 ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
448 crate::query::execute_query_iter_with_spill_path(
449 self.catalog_for_read(),
450 self.indexes_for_read(),
451 self.latest_for_read(),
452 q,
453 Some(self.path.as_path()),
454 )
455 }
456
457 pub fn register_model<T: crate::schema::DbModel>(
459 &mut self,
460 ) -> Result<(CollectionId, SchemaVersion), DbError> {
461 self.register_collection_with_indexes(
462 T::collection_name(),
463 T::fields(),
464 T::indexes(),
465 T::primary_field(),
466 )
467 }
468
469 pub fn collection<'a, T: crate::schema::DbModel>(
471 &'a self,
472 ) -> Result<Collection<'a, S, T>, DbError> {
473 let cid = self.collection_id_named(T::collection_name())?;
474 let col = self.catalog_for_read().get(cid).ok_or(DbError::Schema(
475 SchemaError::UnknownCollection { id: cid.0 },
476 ))?;
477 validate_subset_model::<T>(col)?;
478 Ok(Collection {
479 db: self,
480 collection_id: cid,
481 _marker: PhantomData,
482 })
483 }
484
485 pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
489 self.catalog_for_read()
490 .lookup_name(name)
491 .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
492 name: name.trim().to_string(),
493 }))
494 }
495
496 pub fn register_collection(
501 &mut self,
502 name: &str,
503 fields: Vec<FieldDef>,
504 primary_field: &str,
505 ) -> Result<(CollectionId, SchemaVersion), DbError> {
506 self.register_collection_with_indexes(name, fields, vec![], primary_field)
507 }
508
509 pub fn register_collection_with_indexes(
510 &mut self,
511 name: &str,
512 fields: Vec<FieldDef>,
513 indexes: Vec<crate::schema::IndexDef>,
514 primary_field: &str,
515 ) -> Result<(CollectionId, SchemaVersion), DbError> {
516 let name = helpers::normalize_collection_name(name)?;
517 let pk = primary_field.trim();
518 if pk.is_empty() {
519 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
520 }
521 if !Catalog::has_top_level_field(&fields, pk) {
522 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
523 name: pk.to_string(),
524 }));
525 }
526 if let Some(st) = &mut self.txn_staging {
527 let id = st.shadow_catalog.next_collection_id().0;
528 let wire = CatalogRecordWire::CreateCollection {
529 collection_id: id,
530 name: name.clone(),
531 schema_version: 1,
532 fields,
533 indexes,
534 primary_field: Some(pk.to_string()),
535 };
536 let payload = encode_catalog_payload(&wire);
537 st.shadow_catalog.apply_record(wire)?;
538 st.pending
539 .push((crate::segments::header::SegmentType::Schema, payload));
540 return Ok((CollectionId(id), SchemaVersion(1)));
541 }
542 let id = self.catalog.next_collection_id().0;
543 let wire = CatalogRecordWire::CreateCollection {
544 collection_id: id,
545 name: name.clone(),
546 schema_version: 1,
547 fields,
548 indexes,
549 primary_field: Some(pk.to_string()),
550 };
551 let payload = encode_catalog_payload(&wire);
552 let tid = self.next_txn_id();
553 self.commit_write_batch(
554 tid,
555 &[(
556 crate::segments::header::SegmentType::Schema,
557 payload.as_slice(),
558 )],
559 )?;
560 self.apply_catalog_record(wire)?;
561 Ok((CollectionId(id), SchemaVersion(1)))
562 }
563
564 pub fn register_schema_version(
568 &mut self,
569 id: CollectionId,
570 fields: Vec<FieldDef>,
571 ) -> Result<SchemaVersion, DbError> {
572 self.register_schema_version_with_indexes(id, fields, vec![])
573 }
574
575 pub fn register_schema_version_with_indexes(
576 &mut self,
577 id: CollectionId,
578 fields: Vec<FieldDef>,
579 indexes: Vec<crate::schema::IndexDef>,
580 ) -> Result<SchemaVersion, DbError> {
581 let current = self
582 .catalog_for_read()
583 .get(id)
584 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
585 match classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)? {
587 SchemaChange::Safe => {}
588 SchemaChange::NeedsMigration { reason, .. } => {
589 return Err(DbError::Schema(SchemaError::MigrationRequired {
590 message: reason,
591 }));
592 }
593 SchemaChange::Breaking { reason } => {
594 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
595 message: reason,
596 }));
597 }
598 }
599 let next_v = current
600 .current_version
601 .0
602 .checked_add(1)
603 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
604 let wire = CatalogRecordWire::NewSchemaVersion {
605 collection_id: id.0,
606 schema_version: next_v,
607 fields,
608 indexes,
609 };
610 let payload = encode_catalog_payload(&wire);
611 if let Some(st) = &mut self.txn_staging {
612 st.shadow_catalog.apply_record(wire.clone())?;
613 st.pending
614 .push((crate::segments::header::SegmentType::Schema, payload));
615 return Ok(SchemaVersion(next_v));
616 }
617 let tid = self.next_txn_id();
618 self.commit_write_batch(
619 tid,
620 &[(
621 crate::segments::header::SegmentType::Schema,
622 payload.as_slice(),
623 )],
624 )?;
625 self.apply_catalog_record(wire)?;
626 Ok(SchemaVersion(next_v))
627 }
628
629 pub fn plan_schema_version_with_indexes(
631 &self,
632 id: CollectionId,
633 fields: Vec<FieldDef>,
634 indexes: Vec<crate::schema::IndexDef>,
635 ) -> Result<MigrationPlan, DbError> {
636 let current = self
637 .catalog_for_read()
638 .get(id)
639 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
640 let change = classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)?;
642 let mut steps = Vec::new();
643 match &change {
644 SchemaChange::Safe => {}
645 SchemaChange::Breaking { .. } => {}
646 SchemaChange::NeedsMigration {
647 reason,
648 backfill_top_level_field,
649 backfill_field_path,
650 } => {
651 if let Some(field) = backfill_top_level_field {
652 steps.push(MigrationStep::BackfillTopLevelField {
653 field: field.clone(),
654 });
655 } else if let Some(path) = backfill_field_path {
656 steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
657 } else if reason.contains("unique index") {
658 steps.push(MigrationStep::RebuildIndexes);
659 }
660 }
661 }
662 Ok(MigrationPlan { change, steps })
663 }
664
665 pub fn backfill_top_level_field_with_value(
669 &mut self,
670 collection_id: CollectionId,
671 field: &str,
672 value: RowValue,
673 ) -> Result<(), DbError> {
674 let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
675 self.backfill_field_at_path_with_value(collection_id, &path, value)
676 }
677
678 pub fn backfill_field_at_path_with_value(
680 &mut self,
681 collection_id: CollectionId,
682 path: &crate::schema::FieldPath,
683 value: RowValue,
684 ) -> Result<(), DbError> {
685 let col = self
686 .catalog_for_read()
687 .get(collection_id)
688 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
689 id: collection_id.0,
690 }))?;
691 let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
692 DbError::Schema(SchemaError::RowUnknownField {
693 name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
694 })
695 })?;
696
697 let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
698 for ((cid, _), row) in self.latest_for_read().iter() {
699 if *cid != collection_id.0 {
700 continue;
701 }
702 rows.push(row.clone());
703 }
704
705 self.transaction(|db| {
706 for mut row in rows {
707 if row_value_at_path_segments(&row, &path.0).is_some() {
708 continue;
709 }
710 crate::record::insert_value_at_path(&mut row, path, value.clone())?;
711 db.insert(collection_id, row)?;
712 }
713 Ok(())
714 })
715 }
716
717 pub fn rebuild_indexes_for_collection(
719 &mut self,
720 collection_id: CollectionId,
721 ) -> Result<(), DbError> {
722 let col = self
723 .catalog_for_read()
724 .get(collection_id)
725 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
726 id: collection_id.0,
727 }))?;
728 let pk_name =
729 col.primary_field
730 .as_deref()
731 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
732 collection_id: collection_id.0,
733 }))?;
734 let pk_def = col
735 .fields
736 .iter()
737 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
738 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
739 name: pk_name.to_string(),
740 }))?;
741
742 let mut entries: Vec<IndexEntry> = Vec::new();
743 for ((cid, _), row) in self.latest_for_read().iter() {
744 if *cid != collection_id.0 {
745 continue;
746 }
747 let Some(pk_cell) = row.get(pk_name) else {
748 continue;
749 };
750 let pk_scalar = pk_cell.clone().into_scalar()?;
751 if !pk_scalar.ty_matches(&pk_def.ty) {
752 continue;
753 }
754 for idx in &col.indexes {
755 let Some(v) = scalar_at_path(row, &idx.path) else {
756 continue;
757 };
758 entries.push(IndexEntry {
759 collection_id: collection_id.0,
760 index_name: idx.name.clone(),
761 kind: idx.kind,
762 op: IndexOp::Insert,
763 index_key: v.canonical_key_bytes(),
764 pk_key: pk_scalar.canonical_key_bytes(),
765 });
766 }
767 }
768
769 self.transaction(|db| {
770 if entries.is_empty() {
771 return Ok(());
772 }
773 let st = db
776 .txn_staging
777 .as_mut()
778 .expect("transaction staging must be active");
779 let b = encode_index_payload(&entries);
780 st.pending
781 .push((crate::segments::header::SegmentType::Index, b));
782 for e in entries {
783 st.shadow_indexes.apply(e)?;
784 }
785 Ok(())
786 })
787 }
788
789 pub fn register_schema_version_with_indexes_force(
794 &mut self,
795 id: CollectionId,
796 fields: Vec<FieldDef>,
797 indexes: Vec<crate::schema::IndexDef>,
798 ) -> Result<SchemaVersion, DbError> {
799 let current = self
800 .catalog_for_read()
801 .get(id)
802 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
803 let next_v = current
804 .current_version
805 .0
806 .checked_add(1)
807 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
808 let wire = CatalogRecordWire::NewSchemaVersion {
809 collection_id: id.0,
810 schema_version: next_v,
811 fields,
812 indexes,
813 };
814 let payload = encode_catalog_payload(&wire);
815 if let Some(st) = &mut self.txn_staging {
816 st.shadow_catalog.apply_record(wire.clone())?;
817 st.pending
818 .push((crate::segments::header::SegmentType::Schema, payload));
819 return Ok(SchemaVersion(next_v));
820 }
821 let tid = self.next_txn_id();
822 self.commit_write_batch(
823 tid,
824 &[(
825 crate::segments::header::SegmentType::Schema,
826 payload.as_slice(),
827 )],
828 )?;
829 self.apply_catalog_record(wire)?;
830 Ok(SchemaVersion(next_v))
831 }
832
833 pub fn insert(
838 &mut self,
839 collection_id: CollectionId,
840 row: BTreeMap<String, RowValue>,
841 ) -> Result<(), DbError> {
842 write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
843 let (mut payload, full, mut index_entries, pk_scalar) =
844 plan_insert_row(self.catalog_for_read(), collection_id, row)?;
845 #[cfg(test)]
846 let mut full = full;
847 let existing = self
848 .latest_for_read()
849 .get(&(collection_id.0, full.0.clone()))
850 .cloned();
851 if existing.is_some() {
852 #[cfg(test)]
853 if let Some(poison) = self.test_poison_planned_replace_row.take() {
854 poison(collection_id, &mut full.1);
855 }
856 let col = self
858 .catalog_for_read()
859 .get(collection_id)
860 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
861 id: collection_id.0,
862 }))?;
863 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
864 let pk_name =
865 col.primary_field
866 .as_deref()
867 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
868 collection_id: collection_id.0,
869 }))?;
870 let pk_def = col
871 .fields
872 .iter()
873 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
874 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
875 name: pk_name.to_string(),
876 }))?;
877
878 let non_pk_defs = if has_multi_segment_schema {
879 col.fields
880 .iter()
881 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
882 .collect::<Vec<_>>()
883 } else {
884 non_pk_defs_in_order(&col.fields, pk_name)
885 };
886 let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
887 for def in &non_pk_defs {
888 let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
889 non_pk.push(((*def).clone(), v));
890 }
891 payload = (if has_multi_segment_schema {
892 encode_record_payload_v3_op(
893 collection_id.0,
894 col.current_version.0,
895 OP_REPLACE,
896 &pk_scalar,
897 &pk_def.ty,
898 &non_pk,
899 )
900 } else {
901 encode_record_payload_v2_op(
902 collection_id.0,
903 col.current_version.0,
904 OP_REPLACE,
905 &pk_scalar,
906 &pk_def.ty,
907 &non_pk,
908 )
909 })?;
910 if let Some(ref old_row) = existing {
912 let mut deletes = index_deletes_for_existing_row(
913 collection_id,
914 &pk_scalar,
915 &col.indexes,
916 old_row,
917 );
918 deletes.append(&mut index_entries);
919 index_entries = deletes;
920 }
921 }
922 for e in &index_entries {
923 if e.kind != crate::schema::IndexKind::Unique {
924 continue;
925 }
926 let Some(existing) =
927 self.indexes_for_read()
928 .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
929 else {
930 continue;
931 };
932 if e.op != IndexOp::Insert {
933 continue;
934 }
935 if existing == e.pk_key.as_slice() {
936 continue;
937 }
938 return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
939 }
940 if let Some(st) = &mut self.txn_staging {
941 if !index_entries.is_empty() {
942 let b = encode_index_payload(&index_entries);
943 st.pending
944 .push((crate::segments::header::SegmentType::Index, b));
945 }
946 st.pending.push((
947 crate::segments::header::SegmentType::Record,
948 payload.clone(),
949 ));
950 st.shadow_latest
951 .insert((collection_id.0, full.0.clone()), full.1.clone());
952 for e in index_entries {
953 st.shadow_indexes.apply(e)?;
954 }
955 return Ok(());
956 }
957 let tid = self.next_txn_id();
958 let index_bytes = if index_entries.is_empty() {
959 None
960 } else {
961 Some(encode_index_payload(&index_entries))
962 };
963 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
964 if let Some(ref b) = index_bytes {
965 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
966 }
967 batch.push((
968 crate::segments::header::SegmentType::Record,
969 payload.as_slice(),
970 ));
971 self.commit_write_batch(tid, &batch)?;
972 self.latest.insert((collection_id.0, full.0), full.1);
973 for e in index_entries {
974 self.indexes.apply(e)?;
975 }
976 Ok(())
977 }
978
979 pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
981 write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
982 let col = self
983 .catalog_for_read()
984 .get(collection_id)
985 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
986 id: collection_id.0,
987 }))?;
988 let pk_name =
989 col.primary_field
990 .as_deref()
991 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
992 collection_id: collection_id.0,
993 }))?;
994 let pk_def = col
995 .fields
996 .iter()
997 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
998 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
999 name: pk_name.to_string(),
1000 }))?;
1001 if !pk.ty_matches(&pk_def.ty) {
1002 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1003 }
1004 let pk_key = pk.canonical_key_bytes();
1005 let existing = self
1006 .latest_for_read()
1007 .get(&(collection_id.0, pk_key.clone()))
1008 .cloned();
1009 let Some(old_row) = existing else {
1010 return Ok(());
1011 };
1012 let indexes = col.indexes.clone();
1013 let schema_ver = col.current_version.0;
1014 let pk_ty = pk_def.ty.clone();
1015 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
1016
1017 let mut index_entries =
1018 index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
1019 #[cfg(not(test))]
1020 let pk_for_record = pk.clone();
1021 #[cfg(test)]
1022 let pk_for_record = {
1023 let mut p = pk.clone();
1024 if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
1025 p = poison(p);
1026 }
1027 p
1028 };
1029 let record_payload = (if has_multi_segment_schema {
1030 encode_record_payload_v3_op(
1031 collection_id.0,
1032 schema_ver,
1033 OP_DELETE,
1034 &pk_for_record,
1035 &pk_ty,
1036 &[],
1037 )
1038 } else {
1039 encode_record_payload_v2_op(
1040 collection_id.0,
1041 schema_ver,
1042 OP_DELETE,
1043 &pk_for_record,
1044 &pk_ty,
1045 &[],
1046 )
1047 })?;
1048
1049 if let Some(st) = &mut self.txn_staging {
1050 if !index_entries.is_empty() {
1051 let b = encode_index_payload(&index_entries);
1052 st.pending
1053 .push((crate::segments::header::SegmentType::Index, b));
1054 }
1055 st.pending.push((
1056 crate::segments::header::SegmentType::Record,
1057 record_payload.clone(),
1058 ));
1059 st.shadow_latest.remove(&(collection_id.0, pk_key));
1060 for e in index_entries.drain(..) {
1061 st.shadow_indexes.apply(e)?;
1062 }
1063 return Ok(());
1064 }
1065
1066 let tid = self.next_txn_id();
1067 let index_bytes = if index_entries.is_empty() {
1068 None
1069 } else {
1070 Some(encode_index_payload(&index_entries))
1071 };
1072 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
1073 if let Some(ref b) = index_bytes {
1074 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
1075 }
1076 batch.push((
1077 crate::segments::header::SegmentType::Record,
1078 record_payload.as_slice(),
1079 ));
1080 self.commit_write_batch(tid, &batch)?;
1081 self.latest.remove(&(collection_id.0, pk_key));
1082 for e in index_entries {
1083 self.indexes.apply(e)?;
1084 }
1085 Ok(())
1086 }
1087
1088 pub fn get(
1092 &self,
1093 collection_id: CollectionId,
1094 pk: &ScalarValue,
1095 ) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
1096 let col = self
1097 .catalog_for_read()
1098 .get(collection_id)
1099 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1100 id: collection_id.0,
1101 }))?;
1102 let pk_name =
1103 col.primary_field
1104 .as_deref()
1105 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
1106 collection_id: collection_id.0,
1107 }))?;
1108 let pk_ty = col
1109 .fields
1110 .iter()
1111 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
1112 .map(|f| &f.ty)
1113 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1114 name: pk_name.to_string(),
1115 }))?;
1116 if !pk.ty_matches(pk_ty) {
1117 return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
1118 collection_id: collection_id.0,
1119 }));
1120 }
1121 let key = (collection_id.0, pk.canonical_key_bytes());
1122 Ok(self.latest_for_read().get(&key).cloned())
1123 }
1124
1125 pub fn checkpoint(&mut self) -> Result<(), DbError> {
1131 #[cfg(feature = "tracing")]
1132 let _span = tracing::info_span!("database_checkpoint").entered();
1133 if self.txn_staging.is_some() {
1134 return Err(DbError::Transaction(TransactionError::NestedTransaction));
1135 }
1136
1137 write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
1138
1139 let mut cp = checkpoint::checkpoint_from_state(
1140 self.catalog_for_read(),
1141 self.latest_for_read(),
1142 self.indexes_for_read(),
1143 )?;
1144
1145 let file_len = self.store.len()?;
1146 let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
1147 let checkpoint_offset = writer.offset();
1148
1149 let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
1150 let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
1151 cp.replay_from_offset = replay_from;
1152 let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
1153
1154 let hdr = SegmentHeader {
1155 segment_type: SegmentType::Checkpoint,
1156 payload_len: 0,
1157 payload_crc32c: 0,
1158 };
1159 writer.append(hdr, &payload)?;
1160
1161 publish::append_manifest_and_publish_with_checkpoint(
1162 &mut self.store,
1163 self.segment_start,
1164 Some((checkpoint_offset, payload.len() as u32)),
1165 )?;
1166 self.store.sync()?;
1167 #[cfg(feature = "tracing")]
1168 tracing::info!(
1169 checkpoint_offset,
1170 replay_from,
1171 payload_bytes = payload.len(),
1172 "database_checkpoint_ok"
1173 );
1174 Ok(())
1175 }
1176
1177 #[cfg(test)]
1179 #[doc(hidden)]
1180 pub(crate) fn test_arm_replace_encode_poison_once(
1181 &mut self,
1182 poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
1183 ) {
1184 self.test_poison_planned_replace_row = Some(poison);
1185 }
1186
1187 #[cfg(test)]
1188 #[doc(hidden)]
1189 pub(crate) fn test_arm_delete_encode_poison_once(
1190 &mut self,
1191 poison: fn(ScalarValue) -> ScalarValue,
1192 ) {
1193 self.test_poison_delete_encode_scalar = Some(poison);
1194 }
1195
1196 #[cfg(test)]
1197 #[doc(hidden)]
1198 pub(crate) fn test_catalog_mut(&mut self) -> &mut Catalog {
1199 &mut self.catalog
1200 }
1201
1202 #[cfg(test)]
1204 #[doc(hidden)]
1205 pub(crate) fn test_write_latest_cell_unchecked(
1206 &mut self,
1207 collection_id: CollectionId,
1208 pk: &ScalarValue,
1209 field: &str,
1210 value: RowValue,
1211 ) {
1212 let pk_key = pk.canonical_key_bytes();
1213 let row = self
1214 .latest
1215 .get_mut(&(collection_id.0, pk_key))
1216 .expect("test_write_latest_cell_unchecked: unknown row key");
1217 row.insert(field.to_string(), value);
1218 }
1219}
1220
1221impl Database<FileStore> {
1222 pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1226 self.compact_to_with_fsops(&StdFsOps, dest_path)
1227 }
1228
1229 pub(crate) fn compact_to_with_fsops(
1230 &self,
1231 fs: &dyn FsOps,
1232 dest_path: impl AsRef<Path>,
1233 ) -> Result<(), DbError> {
1234 #[cfg(feature = "tracing")]
1235 let _span = tracing::info_span!(
1236 "database_compact_to",
1237 dest = %dest_path.as_ref().display()
1238 )
1239 .entered();
1240 let bytes = self.compact_snapshot_bytes()?;
1241 let path = dest_path.as_ref();
1242 let file = fs
1243 .open_read_write_create_truncate(path)
1244 .map_err(DbError::Io)?;
1245 let mut store = FileStore::new(file);
1246 store.write_all_at(0, &bytes)?;
1247 store.truncate(bytes.len() as u64)?;
1248 store.sync()?;
1249 #[cfg(feature = "tracing")]
1250 tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
1251 Ok(())
1252 }
1253 pub fn compact_in_place(&mut self) -> Result<(), DbError> {
1254 self.compact_in_place_with_fsops(&StdFsOps)
1255 }
1256
1257 pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
1258 #[cfg(feature = "tracing")]
1259 let _span = tracing::info_span!("database_compact_in_place").entered();
1260 let bytes = self.compact_snapshot_bytes()?;
1264 let live_path = self.path.clone();
1265 let parent = live_path
1266 .parent()
1267 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1268
1269 let pid = std::process::id();
1271 let nanos = std::time::SystemTime::now()
1272 .duration_since(std::time::UNIX_EPOCH)
1273 .map(|d| d.as_nanos())
1274 .unwrap_or(0);
1275 let base = live_path
1276 .file_name()
1277 .and_then(|s| s.to_str())
1278 .unwrap_or("db.modelvault");
1279 let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
1280 let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
1281
1282 {
1284 let file = fs
1285 .open_read_write_create_new(&tmp_path)
1286 .map_err(DbError::Io)?;
1287 let mut store = FileStore::new(file);
1288 store.write_all_at(0, &bytes)?;
1289 store.truncate(bytes.len() as u64)?;
1290 store.sync()?;
1291 }
1292
1293 let _ = fs.remove_file(&bak_path);
1303 fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
1304 let replace_res = fs.rename(&tmp_path, &live_path);
1305 if let Err(e) = replace_res {
1306 let _ = fs.rename(&bak_path, &live_path);
1308 let _ = fs.remove_file(&tmp_path);
1310 return Err(DbError::Io(e));
1311 }
1312
1313 #[cfg(unix)]
1315 {
1316 if let Ok(dir_f) = fs.open_dir(parent) {
1320 let _ = dir_f.sync_all();
1321 }
1322 }
1323
1324 let _ = fs.remove_file(&bak_path);
1325
1326 self.writer_registry = None;
1328 let reopened = match (|| {
1329 let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
1330 Self::open_with_store(live_path.clone(), store, OpenOptions::default())
1331 })() {
1332 Ok(db) => db,
1333 Err(e) => {
1334 let _ = fs.rename(&bak_path, &live_path);
1335 self.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1336 live_path.clone(),
1337 )?);
1338 return Err(e);
1339 }
1340 };
1341 let mut reopened = reopened;
1342 reopened.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1343 live_path.clone(),
1344 )?);
1345 *self = reopened;
1346 #[cfg(feature = "tracing")]
1347 tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
1348 Ok(())
1349 }
1350
1351 pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1356 self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
1357 }
1358
1359 pub(crate) fn export_snapshot_to_path_with_fsops(
1360 &mut self,
1361 fs: &dyn FsOps,
1362 dest_path: impl AsRef<Path>,
1363 ) -> Result<(), DbError> {
1364 self.checkpoint()?;
1365 let dest_path = dest_path.as_ref();
1366 fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
1367 if let Ok(f) = fs.open_read(dest_path) {
1370 let _ = f.sync_all();
1371 }
1372 #[cfg(unix)]
1373 best_effort_fsync_parent_dir(fs, dest_path);
1374 Ok(())
1375 }
1376
1377 pub fn restore_snapshot_to_path(
1381 snapshot_path: impl AsRef<Path>,
1382 dest_path: impl AsRef<Path>,
1383 ) -> Result<(), DbError> {
1384 Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
1385 }
1386
1387 pub(crate) fn restore_snapshot_to_path_with_fsops(
1388 fs: &dyn FsOps,
1389 snapshot_path: impl AsRef<Path>,
1390 dest_path: impl AsRef<Path>,
1391 ) -> Result<(), DbError> {
1392 let snapshot_path = snapshot_path.as_ref();
1393 let dest_path = dest_path.as_ref();
1394 let parent = dest_path
1395 .parent()
1396 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1397
1398 let pid = std::process::id();
1399 let nanos = std::time::SystemTime::now()
1400 .duration_since(std::time::UNIX_EPOCH)
1401 .map(|d| d.as_nanos())
1402 .unwrap_or(0);
1403 let base = dest_path
1404 .file_name()
1405 .and_then(|s| s.to_str())
1406 .unwrap_or("db.modelvault");
1407 let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
1408 let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
1409
1410 fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
1412 if let Ok(f) = fs.open_read(&tmp_path) {
1413 let _ = f.sync_all();
1414 }
1415
1416 if dest_path.exists() {
1418 let _ = fs.remove_file(&bak_path);
1419 fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
1420 }
1421 let replace_res = fs.rename(&tmp_path, dest_path);
1422 if let Err(e) = replace_res {
1423 if bak_path.exists() {
1425 let _ = fs.rename(&bak_path, dest_path);
1426 }
1427 let _ = fs.remove_file(&tmp_path);
1428 return Err(DbError::Io(e));
1429 }
1430
1431 #[cfg(unix)]
1432 {
1433 if let Ok(dir_f) = fs.open_dir(parent) {
1434 let _ = dir_f.sync_all();
1435 }
1436 }
1437 let _ = fs.remove_file(&bak_path);
1438 Ok(())
1439 }
1440}
1441
1442pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
1443 db: &'a Database<S>,
1444 collection_id: CollectionId,
1445 _marker: PhantomData<T>,
1446}
1447
1448impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
1449 pub fn where_eq(
1450 &self,
1451 path: crate::schema::FieldPath,
1452 value: ScalarValue,
1453 ) -> QueryBuilder<'a, S, T> {
1454 QueryBuilder {
1455 db: self.db,
1456 collection_id: self.collection_id,
1457 predicate: Some(crate::query::Predicate::Eq { path, value }),
1458 limit: None,
1459 _marker: PhantomData,
1460 }
1461 }
1462
1463 pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1464 let q = crate::query::Query {
1465 collection: self.collection_id,
1466 predicate: None,
1467 limit: None,
1468 order_by: None,
1469 };
1470 let rows = self.db.query(&q)?;
1471 Ok(rows.into_iter().map(project_row::<T>).collect())
1472 }
1473}
1474
1475pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
1476 db: &'a Database<S>,
1477 collection_id: CollectionId,
1478 predicate: Option<crate::query::Predicate>,
1479 limit: Option<usize>,
1480 _marker: PhantomData<T>,
1481}
1482
1483impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
1484 pub fn limit(mut self, n: usize) -> Self {
1485 self.limit = Some(n);
1486 self
1487 }
1488
1489 pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1490 let q = crate::query::Query {
1491 collection: self.collection_id,
1492 predicate: self.predicate,
1493 limit: self.limit,
1494 order_by: None,
1495 };
1496 let rows = self.db.query(&q)?;
1497 Ok(rows.into_iter().map(project_row::<T>).collect())
1498 }
1499
1500 pub fn explain(self) -> Result<String, DbError> {
1501 let q = crate::query::Query {
1502 collection: self.collection_id,
1503 predicate: self.predicate,
1504 limit: self.limit,
1505 order_by: None,
1506 };
1507 self.db.explain_query(&q)
1508 }
1509}
1510
1511fn validate_subset_model<T: crate::schema::DbModel>(
1512 col: &crate::catalog::CollectionInfo,
1513) -> Result<(), DbError> {
1514 crate::schema_compat::validate_model_fields_against_catalog(
1515 col,
1516 T::primary_field(),
1517 &T::fields(),
1518 &T::indexes(),
1519 )
1520}
1521
1522pub fn row_subset_by_field_defs(
1524 row: &BTreeMap<String, RowValue>,
1525 wanted: &[FieldDef],
1526) -> BTreeMap<String, RowValue> {
1527 let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
1528 for f in wanted {
1529 let segs = &f.path.0;
1530 if segs.is_empty() {
1531 continue;
1532 }
1533 let Some(leaf) = row_value_at_path_segments(row, segs) else {
1534 continue;
1535 };
1536 let root = segs[0].to_string();
1537 if segs.len() == 1 {
1538 out.insert(root, leaf);
1539 } else {
1540 let nested = row_value_nested_object_path(&segs[1..], leaf);
1541 match out.get_mut(&root) {
1542 Some(existing) => merge_row_value_trees(existing, nested),
1543 None => {
1544 out.insert(root, nested);
1545 }
1546 }
1547 }
1548 }
1549 out
1550}
1551
1552fn row_value_at_path_segments(
1553 row: &BTreeMap<String, RowValue>,
1554 path: &[std::borrow::Cow<'static, str>],
1555) -> Option<RowValue> {
1556 if path.is_empty() {
1557 return None;
1558 }
1559 let mut cur = row.get(path[0].as_ref())?;
1560 for seg in path.iter().skip(1) {
1561 cur = match cur {
1562 RowValue::Object(m) => m.get(seg.as_ref())?,
1563 RowValue::None => return None,
1564 _ => return None,
1565 };
1566 }
1567 Some(cur.clone())
1568}
1569
1570fn row_value_nested_object_path(
1572 segments: &[std::borrow::Cow<'static, str>],
1573 leaf: RowValue,
1574) -> RowValue {
1575 debug_assert!(!segments.is_empty());
1576 if segments.len() == 1 {
1577 let mut m = BTreeMap::new();
1578 m.insert(segments[0].to_string(), leaf);
1579 RowValue::Object(m)
1580 } else {
1581 let mut m = BTreeMap::new();
1582 m.insert(
1583 segments[0].to_string(),
1584 row_value_nested_object_path(&segments[1..], leaf),
1585 );
1586 RowValue::Object(m)
1587 }
1588}
1589
1590fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
1591 match (&mut *into, from) {
1592 (RowValue::Object(m1), RowValue::Object(m2)) => {
1593 for (k, v2) in m2 {
1594 match m1.entry(k) {
1595 std::collections::btree_map::Entry::Vacant(e) => {
1596 e.insert(v2);
1597 }
1598 std::collections::btree_map::Entry::Occupied(mut e) => {
1599 merge_row_value_trees(e.get_mut(), v2);
1600 }
1601 }
1602 }
1603 }
1604 (slot, from) => *slot = from,
1605 }
1606}
1607
1608fn project_row<T: crate::schema::DbModel>(
1609 row: BTreeMap<String, RowValue>,
1610) -> BTreeMap<String, RowValue> {
1611 row_subset_by_field_defs(&row, &T::fields())
1612}
1613
1614pub(crate) fn scalar_at_path(
1615 row: &BTreeMap<String, RowValue>,
1616 path: &crate::schema::FieldPath,
1617) -> Option<ScalarValue> {
1618 let mut cur: Option<&RowValue> = None;
1619 for (i, seg) in path.0.iter().enumerate() {
1620 let key = seg.as_ref();
1621 cur = match (i, cur) {
1622 (0, _) => row.get(key),
1623 (_, Some(RowValue::Object(map))) => map.get(key),
1624 (_, Some(RowValue::None)) => return None,
1625 _ => return None,
1626 };
1627 }
1628 cur.and_then(|v| v.as_scalar())
1629}
1630
1631impl Database<FileStore> {
1632 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
1636 Self::open_with_options(path, crate::config::OpenOptions::default())
1637 }
1638
1639 pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
1641 Self::open_with_options(
1642 path,
1643 crate::config::OpenOptions {
1644 recovery: crate::config::RecoveryMode::Strict,
1645 mode: OpenMode::ReadOnly,
1646 },
1647 )
1648 }
1649
1650 pub fn open_with_options(
1652 path: impl AsRef<Path>,
1653 opts: crate::config::OpenOptions,
1654 ) -> Result<Self, DbError> {
1655 let path = path.as_ref().to_path_buf();
1656 let store = FileStore::open_locked(&path, opts.mode)?;
1657 let mut db = Self::open_with_store(path.clone(), store, opts)?;
1658 if opts.mode == OpenMode::ReadWrite {
1659 db.writer_registry = Some(writer_registry::WriterRegistryGuard::new(path)?);
1660 }
1661 Ok(db)
1662 }
1663}
1664
1665impl Database<VecStore> {
1666 pub fn open_in_memory() -> Result<Self, DbError> {
1668 Self::open_in_memory_with_options(crate::config::OpenOptions::default())
1669 }
1670
1671 pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
1673 Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
1674 }
1675
1676 pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
1678 Self::open_with_store(
1679 PathBuf::from(":memory:"),
1680 VecStore::from_vec(bytes),
1681 crate::config::OpenOptions::default(),
1682 )
1683 }
1684
1685 pub fn into_snapshot_bytes(self) -> Vec<u8> {
1687 self.store.into_inner()
1688 }
1689
1690 pub fn snapshot_bytes(&self) -> Vec<u8> {
1692 self.store.as_slice().to_vec()
1693 }
1694
1695 pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1697 Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
1698 }
1699
1700 pub(crate) fn export_snapshot_to_path_with_fsops(
1701 fs: &dyn FsOps,
1702 dest_path: impl AsRef<Path>,
1703 bytes: &[u8],
1704 ) -> Result<(), DbError> {
1705 fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
1706 Ok(())
1707 }
1708
1709 pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
1711 let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
1712 Self::from_snapshot_bytes(bytes)
1713 }
1714}
1715
1716#[cfg(test)]
1717mod scalar_at_path_tests {
1718 include!(concat!(
1719 env!("CARGO_MANIFEST_DIR"),
1720 "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
1721 ));
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 include!(concat!(
1727 env!("CARGO_MANIFEST_DIR"),
1728 "/tests/unit/src_db_mod_tests.rs"
1729 ));
1730}