1mod fs_ops;
7mod helpers;
8mod open;
9mod recover;
10mod replay;
11pub(crate) mod row_materialize;
12pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
13mod row_merge;
14pub(crate) mod row_paths;
15pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
16mod write;
17
18use std::collections::{BTreeMap, HashMap};
19use std::marker::PhantomData;
20use std::path::{Path, PathBuf};
21
22use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
23use crate::config::{OpenMode, OpenOptions};
24use crate::error::{DbError, FormatError, SchemaError, TransactionError};
25use crate::index::IndexState;
26use crate::index::{encode_index_payload, IndexEntry, IndexOp};
27use crate::record::{
28 encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
29 encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
30 OP_REPLACE,
31};
32use crate::schema::{classify_schema_update, SchemaChange};
33use crate::schema::{CollectionId, FieldDef, SchemaVersion};
34use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
35use crate::segments::writer::SegmentWriter;
36use crate::storage::{FileStore, Store, VecStore};
37use crate::validation;
38use crate::{checkpoint, publish};
39use crate::{MigrationPlan, MigrationStep};
40
41use self::fs_ops::{FsOps, StdFsOps};
42
43#[cfg(unix)]
45fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
46 let Some(parent) = dest_path.parent() else {
47 return;
48 };
49 let Ok(dir_f) = fs.open_dir(parent) else {
50 return;
51 };
52 let _ = dir_f.sync_all();
53}
54
55pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
56
57type PlannedInsert = (
58 Vec<u8>,
59 (Vec<u8>, BTreeMap<String, RowValue>),
60 Vec<IndexEntry>,
61 ScalarValue,
62);
63
64fn plan_insert_row(
65 catalog: &Catalog,
66 collection_id: CollectionId,
67 mut row: BTreeMap<String, RowValue>,
68) -> Result<PlannedInsert, DbError> {
69 let col =
70 catalog
71 .get(collection_id)
72 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
73 id: collection_id.0,
74 }))?;
75 let pk_name =
76 col.primary_field
77 .as_deref()
78 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
79 collection_id: collection_id.0,
80 }))?;
81 let pk_def = col
82 .fields
83 .iter()
84 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
85 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
86 name: pk_name.to_string(),
87 }))?;
88 let pk_ty = &pk_def.ty;
89 validation::ensure_pk_type_primitive(pk_ty)?;
90 let mut pk_path = vec![pk_name.to_string()];
91 let pk_cell = row
92 .get(pk_name)
93 .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
94 name: pk_name.to_string(),
95 }))?;
96 validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
97 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
100 if !has_multi_segment_schema {
101 validation::validate_top_level_row(&col.fields, pk_name, &row)?;
102 } else {
103 validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
104 }
105
106 let pk_val = row.remove(pk_name).unwrap();
108 let pk_scalar = pk_val
110 .clone()
111 .into_scalar()
112 .expect("validated primary key must be scalar");
113
114 let non_pk_defs = if has_multi_segment_schema {
118 col.fields
119 .iter()
120 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
121 .collect::<Vec<_>>()
122 } else {
123 non_pk_defs_in_order(&col.fields, pk_name)
124 };
125 let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
126
127 let payload = if has_multi_segment_schema {
128 encode_record_payload_v3(
129 collection_id.0,
130 col.current_version.0,
131 &pk_scalar,
132 pk_ty,
133 &non_pk,
134 )
135 .expect("record payload encoding must succeed after validation")
136 } else {
137 encode_record_payload_v2(
138 collection_id.0,
139 col.current_version.0,
140 &pk_scalar,
141 pk_ty,
142 &non_pk,
143 )
144 .expect("record payload encoding must succeed after validation")
145 };
146
147 let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
148 full_map.insert(pk_name.to_string(), pk_val);
149 for (def, v) in &non_pk {
150 let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
151 if parts.len() == 1 {
152 full_map.insert(parts[0].clone(), v.clone());
153 } else {
154 debug_assert!(parts.len() >= 2);
155 row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
156 }
157 }
158 let mut index_entries: Vec<IndexEntry> = Vec::new();
159 for idx in &col.indexes {
160 let Some(v) = scalar_at_path(&full_map, &idx.path) else {
161 continue;
162 };
163 index_entries.push(IndexEntry {
164 collection_id: collection_id.0,
165 index_name: idx.name.clone(),
166 kind: idx.kind,
167 op: IndexOp::Insert,
168 index_key: v.canonical_key_bytes(),
169 pk_key: pk_scalar.canonical_key_bytes(),
170 });
171 }
172 let pk_key = pk_scalar.canonical_key_bytes();
173 Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
174}
175
176fn index_deletes_for_existing_row(
177 collection_id: CollectionId,
178 pk_scalar: &ScalarValue,
179 indexes: &[crate::schema::IndexDef],
180 existing_row: &BTreeMap<String, RowValue>,
181) -> Vec<IndexEntry> {
182 let mut out = Vec::new();
183 for idx in indexes {
184 let Some(v) = scalar_at_path(existing_row, &idx.path) else {
185 continue;
186 };
187 out.push(IndexEntry {
188 collection_id: collection_id.0,
189 index_name: idx.name.clone(),
190 kind: idx.kind,
191 op: IndexOp::Delete,
192 index_key: v.canonical_key_bytes(),
193 pk_key: pk_scalar.canonical_key_bytes(),
194 });
195 }
196 out
197}
198
199pub(crate) struct TxnStaging {
201 pub(crate) txn_id: u64,
202 pub(crate) shadow_catalog: Catalog,
203 pub(crate) shadow_latest: LatestMap,
204 pub(crate) shadow_indexes: IndexState,
205 pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
206}
207
208pub struct Database<S: Store = FileStore> {
210 path: PathBuf,
212 store: S,
213 catalog: Catalog,
215 segment_start: u64,
217 format_minor: u16,
219 latest: LatestMap,
221 indexes: IndexState,
223 txn_seq: u64,
225 txn_staging: Option<TxnStaging>,
227 #[cfg(test)]
229 #[doc(hidden)]
230 #[allow(clippy::type_complexity)]
231 pub(crate) test_poison_planned_replace_row:
232 Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
233 #[cfg(test)]
235 #[doc(hidden)]
236 pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
237}
238
239impl<S: Store> Database<S> {
240 fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
241 let mut out = Database::<VecStore>::open_in_memory()?;
242
243 let mut cols = self.catalog_for_read().collections();
245 cols.sort_by_key(|c| c.id.0);
246 for c in &cols {
247 let pk =
248 c.primary_field
249 .as_deref()
250 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
251 collection_id: c.id.0,
252 }))?;
253 let (new_id, _v1) = out.register_collection_with_indexes(
254 &c.name,
255 c.fields.clone(),
256 c.indexes.clone(),
257 pk,
258 )?;
259 for _ in 2..=c.current_version.0 {
261 let _ = out.register_schema_version_with_indexes_force(
262 new_id,
263 c.fields.clone(),
264 c.indexes.clone(),
265 )?;
266 }
267 }
268
269 for ((cid, _), row) in self.latest_for_read().iter() {
271 let collection_id = CollectionId(*cid);
272 out.insert(collection_id, row.clone())?;
273 }
274
275 Ok(out.into_snapshot_bytes())
276 }
277
278 pub(crate) fn open_with_store(
279 path: PathBuf,
280 store: S,
281 opts: OpenOptions,
282 ) -> Result<Self, DbError> {
283 open::open_with_store(path, store, opts)
284 }
285
286 fn next_txn_id(&mut self) -> u64 {
287 self.txn_seq = self.txn_seq.saturating_add(1);
288 self.txn_seq
289 }
290
291 #[inline]
292 fn commit_write_batch(
293 &mut self,
294 txn_id: u64,
295 body: &[(crate::segments::header::SegmentType, &[u8])],
296 ) -> Result<(), DbError> {
297 write::commit_write_txn_v6(
298 &mut self.store,
299 self.segment_start,
300 &mut self.format_minor,
301 txn_id,
302 body,
303 )
304 }
305
306 #[inline]
307 fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
308 self.catalog.apply_record(wire)
309 }
310
311 pub fn transaction<R>(
315 &mut self,
316 f: impl FnOnce(&mut Self) -> Result<R, DbError>,
317 ) -> Result<R, DbError> {
318 self.begin_transaction()?;
319 match f(self) {
320 Ok(v) => {
321 self.commit_transaction()?;
322 Ok(v)
323 }
324 Err(e) => {
325 self.rollback_transaction();
326 Err(e)
327 }
328 }
329 }
330
331 pub fn begin_transaction(&mut self) -> Result<(), DbError> {
334 if self.txn_staging.is_some() {
335 return Err(DbError::Transaction(TransactionError::NestedTransaction));
336 }
337 let tid = self.next_txn_id();
338 self.txn_staging = Some(TxnStaging {
339 txn_id: tid,
340 shadow_catalog: self.catalog.clone(),
341 shadow_latest: self.latest.clone(),
342 shadow_indexes: self.indexes.clone(),
343 pending: Vec::new(),
344 });
345 Ok(())
346 }
347
348 pub fn commit_transaction(&mut self) -> Result<(), DbError> {
350 self.commit_txn_staging()
351 }
352
353 pub fn rollback_transaction(&mut self) {
355 self.txn_staging = None;
356 }
357
358 fn commit_txn_staging(&mut self) -> Result<(), DbError> {
359 let Some(st) = self.txn_staging.take() else {
360 return Ok(());
361 };
362 if st.pending.is_empty() {
363 self.catalog = st.shadow_catalog;
364 self.latest = st.shadow_latest;
365 self.indexes = st.shadow_indexes;
366 return Ok(());
367 }
368 let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
369 st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
370 self.commit_write_batch(st.txn_id, &batch)?;
371 self.catalog = st.shadow_catalog;
372 self.latest = st.shadow_latest;
373 self.indexes = st.shadow_indexes;
374 Ok(())
375 }
376
377 fn catalog_for_read(&self) -> &Catalog {
378 if let Some(ref st) = self.txn_staging {
379 &st.shadow_catalog
380 } else {
381 &self.catalog
382 }
383 }
384
385 fn indexes_for_read(&self) -> &IndexState {
386 if let Some(ref st) = self.txn_staging {
387 &st.shadow_indexes
388 } else {
389 &self.indexes
390 }
391 }
392
393 fn latest_for_read(&self) -> &LatestMap {
394 if let Some(ref st) = self.txn_staging {
395 &st.shadow_latest
396 } else {
397 &self.latest
398 }
399 }
400
401 pub fn path(&self) -> &Path {
403 &self.path
404 }
405
406 pub fn catalog(&self) -> &Catalog {
408 self.catalog_for_read()
409 }
410
411 pub fn collection_names(&self) -> Vec<String> {
413 self.catalog_for_read().collection_names()
414 }
415
416 pub fn index_state(&self) -> &IndexState {
418 self.indexes_for_read()
419 }
420
421 pub fn query(
423 &self,
424 q: &crate::query::Query,
425 ) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
426 crate::query::execute_query(
427 self.catalog_for_read(),
428 self.indexes_for_read(),
429 self.latest_for_read(),
430 q,
431 )
432 }
433
434 pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
436 crate::query::explain_query(self.catalog_for_read(), q)
437 }
438
439 pub fn query_iter(
444 &self,
445 q: &crate::query::Query,
446 ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
447 crate::query::execute_query_iter_with_spill_path(
448 self.catalog_for_read(),
449 self.indexes_for_read(),
450 self.latest_for_read(),
451 q,
452 Some(self.path.as_path()),
453 )
454 }
455
456 pub fn register_model<T: crate::schema::DbModel>(
458 &mut self,
459 ) -> Result<(CollectionId, SchemaVersion), DbError> {
460 self.register_collection_with_indexes(
461 T::collection_name(),
462 T::fields(),
463 T::indexes(),
464 T::primary_field(),
465 )
466 }
467
468 pub fn collection<'a, T: crate::schema::DbModel>(
470 &'a self,
471 ) -> Result<Collection<'a, S, T>, DbError> {
472 let cid = self.collection_id_named(T::collection_name())?;
473 let col = self
474 .catalog_for_read()
475 .get(cid)
476 .expect("collection id from name lookup must exist in catalog");
477 validate_subset_model::<T>(col)?;
478 Ok(Collection {
479 db: self,
480 collection_id: cid,
481 _marker: PhantomData,
482 })
483 }
484
485 pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
489 self.catalog_for_read()
490 .lookup_name(name)
491 .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
492 name: name.trim().to_string(),
493 }))
494 }
495
496 pub fn register_collection(
501 &mut self,
502 name: &str,
503 fields: Vec<FieldDef>,
504 primary_field: &str,
505 ) -> Result<(CollectionId, SchemaVersion), DbError> {
506 self.register_collection_with_indexes(name, fields, vec![], primary_field)
507 }
508
509 pub fn register_collection_with_indexes(
510 &mut self,
511 name: &str,
512 fields: Vec<FieldDef>,
513 indexes: Vec<crate::schema::IndexDef>,
514 primary_field: &str,
515 ) -> Result<(CollectionId, SchemaVersion), DbError> {
516 let name = helpers::normalize_collection_name(name)?;
517 let pk = primary_field.trim();
518 if pk.is_empty() {
519 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
520 }
521 if !Catalog::has_top_level_field(&fields, pk) {
522 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
523 name: pk.to_string(),
524 }));
525 }
526 if let Some(st) = &mut self.txn_staging {
527 let id = st.shadow_catalog.next_collection_id().0;
528 let wire = CatalogRecordWire::CreateCollection {
529 collection_id: id,
530 name: name.clone(),
531 schema_version: 1,
532 fields,
533 indexes,
534 primary_field: Some(pk.to_string()),
535 };
536 let payload = encode_catalog_payload(&wire);
537 st.shadow_catalog.apply_record(wire)?;
538 st.pending
539 .push((crate::segments::header::SegmentType::Schema, payload));
540 return Ok((CollectionId(id), SchemaVersion(1)));
541 }
542 let id = self.catalog.next_collection_id().0;
543 let wire = CatalogRecordWire::CreateCollection {
544 collection_id: id,
545 name: name.clone(),
546 schema_version: 1,
547 fields,
548 indexes,
549 primary_field: Some(pk.to_string()),
550 };
551 let payload = encode_catalog_payload(&wire);
552 let tid = self.next_txn_id();
553 self.commit_write_batch(
554 tid,
555 &[(
556 crate::segments::header::SegmentType::Schema,
557 payload.as_slice(),
558 )],
559 )?;
560 self.apply_catalog_record(wire)?;
561 Ok((CollectionId(id), SchemaVersion(1)))
562 }
563
564 pub fn register_schema_version(
568 &mut self,
569 id: CollectionId,
570 fields: Vec<FieldDef>,
571 ) -> Result<SchemaVersion, DbError> {
572 self.register_schema_version_with_indexes(id, fields, vec![])
573 }
574
575 pub fn register_schema_version_with_indexes(
576 &mut self,
577 id: CollectionId,
578 fields: Vec<FieldDef>,
579 indexes: Vec<crate::schema::IndexDef>,
580 ) -> Result<SchemaVersion, DbError> {
581 let current = self
582 .catalog_for_read()
583 .get(id)
584 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
585 match classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)? {
587 SchemaChange::Safe => {}
588 SchemaChange::NeedsMigration { reason, .. } => {
589 return Err(DbError::Schema(SchemaError::MigrationRequired {
590 message: reason,
591 }));
592 }
593 SchemaChange::Breaking { reason } => {
594 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
595 message: reason,
596 }));
597 }
598 }
599 let next_v = current
600 .current_version
601 .0
602 .checked_add(1)
603 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
604 let wire = CatalogRecordWire::NewSchemaVersion {
605 collection_id: id.0,
606 schema_version: next_v,
607 fields,
608 indexes,
609 };
610 let payload = encode_catalog_payload(&wire);
611 if let Some(st) = &mut self.txn_staging {
612 st.shadow_catalog.apply_record(wire.clone())?;
613 st.pending
614 .push((crate::segments::header::SegmentType::Schema, payload));
615 return Ok(SchemaVersion(next_v));
616 }
617 let tid = self.next_txn_id();
618 self.commit_write_batch(
619 tid,
620 &[(
621 crate::segments::header::SegmentType::Schema,
622 payload.as_slice(),
623 )],
624 )?;
625 self.apply_catalog_record(wire)?;
626 Ok(SchemaVersion(next_v))
627 }
628
629 pub fn plan_schema_version_with_indexes(
631 &self,
632 id: CollectionId,
633 fields: Vec<FieldDef>,
634 indexes: Vec<crate::schema::IndexDef>,
635 ) -> Result<MigrationPlan, DbError> {
636 let current = self
637 .catalog_for_read()
638 .get(id)
639 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
640 let change = classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)?;
642 let mut steps = Vec::new();
643 match &change {
644 SchemaChange::Safe => {}
645 SchemaChange::Breaking { .. } => {}
646 SchemaChange::NeedsMigration {
647 reason,
648 backfill_top_level_field,
649 backfill_field_path,
650 } => {
651 if let Some(field) = backfill_top_level_field {
652 steps.push(MigrationStep::BackfillTopLevelField {
653 field: field.clone(),
654 });
655 } else if let Some(path) = backfill_field_path {
656 steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
657 } else if reason.contains("unique index") {
658 steps.push(MigrationStep::RebuildIndexes);
659 }
660 }
661 }
662 Ok(MigrationPlan { change, steps })
663 }
664
665 pub fn backfill_top_level_field_with_value(
669 &mut self,
670 collection_id: CollectionId,
671 field: &str,
672 value: RowValue,
673 ) -> Result<(), DbError> {
674 let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
675 self.backfill_field_at_path_with_value(collection_id, &path, value)
676 }
677
678 pub fn backfill_field_at_path_with_value(
680 &mut self,
681 collection_id: CollectionId,
682 path: &crate::schema::FieldPath,
683 value: RowValue,
684 ) -> Result<(), DbError> {
685 let col = self
686 .catalog_for_read()
687 .get(collection_id)
688 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
689 id: collection_id.0,
690 }))?;
691 let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
692 DbError::Schema(SchemaError::RowUnknownField {
693 name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
694 })
695 })?;
696
697 let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
698 for ((cid, _), row) in self.latest_for_read().iter() {
699 if *cid != collection_id.0 {
700 continue;
701 }
702 rows.push(row.clone());
703 }
704
705 self.transaction(|db| {
706 for mut row in rows {
707 if row_value_at_path_segments(&row, &path.0).is_some() {
708 continue;
709 }
710 crate::record::insert_value_at_path(&mut row, path, value.clone())?;
711 db.insert(collection_id, row)?;
712 }
713 Ok(())
714 })
715 }
716
717 pub fn rebuild_indexes_for_collection(
719 &mut self,
720 collection_id: CollectionId,
721 ) -> Result<(), DbError> {
722 let col = self
723 .catalog_for_read()
724 .get(collection_id)
725 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
726 id: collection_id.0,
727 }))?;
728 let pk_name =
729 col.primary_field
730 .as_deref()
731 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
732 collection_id: collection_id.0,
733 }))?;
734 let pk_def = col
735 .fields
736 .iter()
737 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
738 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
739 name: pk_name.to_string(),
740 }))?;
741
742 let mut entries: Vec<IndexEntry> = Vec::new();
743 for ((cid, _), row) in self.latest_for_read().iter() {
744 if *cid != collection_id.0 {
745 continue;
746 }
747 let Some(pk_cell) = row.get(pk_name) else {
748 continue;
749 };
750 let pk_scalar = pk_cell.clone().into_scalar()?;
751 if !pk_scalar.ty_matches(&pk_def.ty) {
752 continue;
753 }
754 for idx in &col.indexes {
755 let Some(v) = scalar_at_path(row, &idx.path) else {
756 continue;
757 };
758 entries.push(IndexEntry {
759 collection_id: collection_id.0,
760 index_name: idx.name.clone(),
761 kind: idx.kind,
762 op: IndexOp::Insert,
763 index_key: v.canonical_key_bytes(),
764 pk_key: pk_scalar.canonical_key_bytes(),
765 });
766 }
767 }
768
769 self.transaction(|db| {
770 if entries.is_empty() {
771 return Ok(());
772 }
773 let st = db
776 .txn_staging
777 .as_mut()
778 .expect("transaction staging must be active");
779 let b = encode_index_payload(&entries);
780 st.pending
781 .push((crate::segments::header::SegmentType::Index, b));
782 for e in entries {
783 st.shadow_indexes.apply(e)?;
784 }
785 Ok(())
786 })
787 }
788
789 pub fn register_schema_version_with_indexes_force(
794 &mut self,
795 id: CollectionId,
796 fields: Vec<FieldDef>,
797 indexes: Vec<crate::schema::IndexDef>,
798 ) -> Result<SchemaVersion, DbError> {
799 let current = self
800 .catalog_for_read()
801 .get(id)
802 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
803 let next_v = current
804 .current_version
805 .0
806 .checked_add(1)
807 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
808 let wire = CatalogRecordWire::NewSchemaVersion {
809 collection_id: id.0,
810 schema_version: next_v,
811 fields,
812 indexes,
813 };
814 let payload = encode_catalog_payload(&wire);
815 if let Some(st) = &mut self.txn_staging {
816 st.shadow_catalog.apply_record(wire.clone())?;
817 st.pending
818 .push((crate::segments::header::SegmentType::Schema, payload));
819 return Ok(SchemaVersion(next_v));
820 }
821 let tid = self.next_txn_id();
822 self.commit_write_batch(
823 tid,
824 &[(
825 crate::segments::header::SegmentType::Schema,
826 payload.as_slice(),
827 )],
828 )?;
829 self.apply_catalog_record(wire)?;
830 Ok(SchemaVersion(next_v))
831 }
832
833 pub fn insert(
838 &mut self,
839 collection_id: CollectionId,
840 row: BTreeMap<String, RowValue>,
841 ) -> Result<(), DbError> {
842 write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
843 let (mut payload, full, mut index_entries, pk_scalar) =
844 plan_insert_row(self.catalog_for_read(), collection_id, row)?;
845 #[cfg(test)]
846 let mut full = full;
847 let existing = self
848 .latest_for_read()
849 .get(&(collection_id.0, full.0.clone()))
850 .cloned();
851 if existing.is_some() {
852 #[cfg(test)]
853 if let Some(poison) = self.test_poison_planned_replace_row.take() {
854 poison(collection_id, &mut full.1);
855 }
856 let col = self
858 .catalog_for_read()
859 .get(collection_id)
860 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
861 id: collection_id.0,
862 }))?;
863 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
864 let pk_name =
865 col.primary_field
866 .as_deref()
867 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
868 collection_id: collection_id.0,
869 }))?;
870 let pk_def = col
871 .fields
872 .iter()
873 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
874 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
875 name: pk_name.to_string(),
876 }))?;
877
878 let non_pk_defs = if has_multi_segment_schema {
879 col.fields
880 .iter()
881 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
882 .collect::<Vec<_>>()
883 } else {
884 non_pk_defs_in_order(&col.fields, pk_name)
885 };
886 let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
887 for def in &non_pk_defs {
888 let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
889 non_pk.push(((*def).clone(), v));
890 }
891 payload = (if has_multi_segment_schema {
892 encode_record_payload_v3_op(
893 collection_id.0,
894 col.current_version.0,
895 OP_REPLACE,
896 &pk_scalar,
897 &pk_def.ty,
898 &non_pk,
899 )
900 } else {
901 encode_record_payload_v2_op(
902 collection_id.0,
903 col.current_version.0,
904 OP_REPLACE,
905 &pk_scalar,
906 &pk_def.ty,
907 &non_pk,
908 )
909 })?;
910 if let Some(ref old_row) = existing {
912 let mut deletes = index_deletes_for_existing_row(
913 collection_id,
914 &pk_scalar,
915 &col.indexes,
916 old_row,
917 );
918 deletes.append(&mut index_entries);
919 index_entries = deletes;
920 }
921 }
922 for e in &index_entries {
923 if e.kind != crate::schema::IndexKind::Unique {
924 continue;
925 }
926 let Some(existing) =
927 self.indexes_for_read()
928 .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
929 else {
930 continue;
931 };
932 if e.op != IndexOp::Insert {
933 continue;
934 }
935 if existing == e.pk_key.as_slice() {
936 continue;
937 }
938 return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
939 }
940 if let Some(st) = &mut self.txn_staging {
941 if !index_entries.is_empty() {
942 let b = encode_index_payload(&index_entries);
943 st.pending
944 .push((crate::segments::header::SegmentType::Index, b));
945 }
946 st.pending.push((
947 crate::segments::header::SegmentType::Record,
948 payload.clone(),
949 ));
950 st.shadow_latest
951 .insert((collection_id.0, full.0.clone()), full.1.clone());
952 for e in index_entries {
953 st.shadow_indexes.apply(e)?;
954 }
955 return Ok(());
956 }
957 let tid = self.next_txn_id();
958 let index_bytes = if index_entries.is_empty() {
959 None
960 } else {
961 Some(encode_index_payload(&index_entries))
962 };
963 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
964 if let Some(ref b) = index_bytes {
965 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
966 }
967 batch.push((
968 crate::segments::header::SegmentType::Record,
969 payload.as_slice(),
970 ));
971 self.commit_write_batch(tid, &batch)?;
972 self.latest.insert((collection_id.0, full.0), full.1);
973 for e in index_entries {
974 self.indexes.apply(e)?;
975 }
976 Ok(())
977 }
978
979 pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
981 write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
982 let col = self
983 .catalog_for_read()
984 .get(collection_id)
985 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
986 id: collection_id.0,
987 }))?;
988 let pk_name =
989 col.primary_field
990 .as_deref()
991 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
992 collection_id: collection_id.0,
993 }))?;
994 let pk_def = col
995 .fields
996 .iter()
997 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
998 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
999 name: pk_name.to_string(),
1000 }))?;
1001 if !pk.ty_matches(&pk_def.ty) {
1002 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1003 }
1004 let pk_key = pk.canonical_key_bytes();
1005 let existing = self
1006 .latest_for_read()
1007 .get(&(collection_id.0, pk_key.clone()))
1008 .cloned();
1009 let Some(old_row) = existing else {
1010 return Ok(());
1011 };
1012 let indexes = col.indexes.clone();
1013 let schema_ver = col.current_version.0;
1014 let pk_ty = pk_def.ty.clone();
1015 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
1016
1017 let mut index_entries =
1018 index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
1019 #[cfg(not(test))]
1020 let pk_for_record = pk.clone();
1021 #[cfg(test)]
1022 let pk_for_record = {
1023 let mut p = pk.clone();
1024 if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
1025 p = poison(p);
1026 }
1027 p
1028 };
1029 let record_payload = (if has_multi_segment_schema {
1030 encode_record_payload_v3_op(
1031 collection_id.0,
1032 schema_ver,
1033 OP_DELETE,
1034 &pk_for_record,
1035 &pk_ty,
1036 &[],
1037 )
1038 } else {
1039 encode_record_payload_v2_op(
1040 collection_id.0,
1041 schema_ver,
1042 OP_DELETE,
1043 &pk_for_record,
1044 &pk_ty,
1045 &[],
1046 )
1047 })?;
1048
1049 if let Some(st) = &mut self.txn_staging {
1050 if !index_entries.is_empty() {
1051 let b = encode_index_payload(&index_entries);
1052 st.pending
1053 .push((crate::segments::header::SegmentType::Index, b));
1054 }
1055 st.pending.push((
1056 crate::segments::header::SegmentType::Record,
1057 record_payload.clone(),
1058 ));
1059 st.shadow_latest.remove(&(collection_id.0, pk_key));
1060 for e in index_entries.drain(..) {
1061 st.shadow_indexes.apply(e)?;
1062 }
1063 return Ok(());
1064 }
1065
1066 let tid = self.next_txn_id();
1067 let index_bytes = if index_entries.is_empty() {
1068 None
1069 } else {
1070 Some(encode_index_payload(&index_entries))
1071 };
1072 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
1073 if let Some(ref b) = index_bytes {
1074 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
1075 }
1076 batch.push((
1077 crate::segments::header::SegmentType::Record,
1078 record_payload.as_slice(),
1079 ));
1080 self.commit_write_batch(tid, &batch)?;
1081 self.latest.remove(&(collection_id.0, pk_key));
1082 for e in index_entries {
1083 self.indexes.apply(e)?;
1084 }
1085 Ok(())
1086 }
1087
1088 pub fn get(
1092 &self,
1093 collection_id: CollectionId,
1094 pk: &ScalarValue,
1095 ) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
1096 let col = self
1097 .catalog_for_read()
1098 .get(collection_id)
1099 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1100 id: collection_id.0,
1101 }))?;
1102 let pk_name =
1103 col.primary_field
1104 .as_deref()
1105 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
1106 collection_id: collection_id.0,
1107 }))?;
1108 let pk_ty = col
1109 .fields
1110 .iter()
1111 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
1112 .map(|f| &f.ty)
1113 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1114 name: pk_name.to_string(),
1115 }))?;
1116 if !pk.ty_matches(pk_ty) {
1117 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1118 }
1119 let key = (collection_id.0, pk.canonical_key_bytes());
1120 Ok(self.latest_for_read().get(&key).cloned())
1121 }
1122
1123 pub fn checkpoint(&mut self) -> Result<(), DbError> {
1129 #[cfg(feature = "tracing")]
1130 let _span = tracing::info_span!("database_checkpoint").entered();
1131 if self.txn_staging.is_some() {
1132 return Err(DbError::Transaction(TransactionError::NestedTransaction));
1133 }
1134
1135 write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
1136
1137 let mut cp = checkpoint::checkpoint_from_state(
1138 self.catalog_for_read(),
1139 self.latest_for_read(),
1140 self.indexes_for_read(),
1141 )?;
1142
1143 let file_len = self.store.len()?;
1144 let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
1145 let checkpoint_offset = writer.offset();
1146
1147 let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
1148 let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
1149 cp.replay_from_offset = replay_from;
1150 let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
1151
1152 let hdr = SegmentHeader {
1153 segment_type: SegmentType::Checkpoint,
1154 payload_len: 0,
1155 payload_crc32c: 0,
1156 };
1157 writer.append(hdr, &payload)?;
1158
1159 publish::append_manifest_and_publish_with_checkpoint(
1160 &mut self.store,
1161 self.segment_start,
1162 Some((checkpoint_offset, payload.len() as u32)),
1163 )?;
1164 self.store.sync()?;
1165 #[cfg(feature = "tracing")]
1166 tracing::info!(
1167 checkpoint_offset,
1168 replay_from,
1169 payload_bytes = payload.len(),
1170 "database_checkpoint_ok"
1171 );
1172 Ok(())
1173 }
1174
1175 #[cfg(test)]
1177 #[doc(hidden)]
1178 pub(crate) fn test_arm_replace_encode_poison_once(
1179 &mut self,
1180 poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
1181 ) {
1182 self.test_poison_planned_replace_row = Some(poison);
1183 }
1184
1185 #[cfg(test)]
1186 #[doc(hidden)]
1187 pub(crate) fn test_arm_delete_encode_poison_once(
1188 &mut self,
1189 poison: fn(ScalarValue) -> ScalarValue,
1190 ) {
1191 self.test_poison_delete_encode_scalar = Some(poison);
1192 }
1193
1194 #[cfg(test)]
1196 #[doc(hidden)]
1197 pub(crate) fn test_write_latest_cell_unchecked(
1198 &mut self,
1199 collection_id: CollectionId,
1200 pk: &ScalarValue,
1201 field: &str,
1202 value: RowValue,
1203 ) {
1204 let pk_key = pk.canonical_key_bytes();
1205 let row = self
1206 .latest
1207 .get_mut(&(collection_id.0, pk_key))
1208 .expect("test_write_latest_cell_unchecked: unknown row key");
1209 row.insert(field.to_string(), value);
1210 }
1211}
1212
1213impl Database<FileStore> {
1214 pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1218 self.compact_to_with_fsops(&StdFsOps, dest_path)
1219 }
1220
1221 pub(crate) fn compact_to_with_fsops(
1222 &self,
1223 fs: &dyn FsOps,
1224 dest_path: impl AsRef<Path>,
1225 ) -> Result<(), DbError> {
1226 #[cfg(feature = "tracing")]
1227 let _span = tracing::info_span!(
1228 "database_compact_to",
1229 dest = %dest_path.as_ref().display()
1230 )
1231 .entered();
1232 let bytes = self.compact_snapshot_bytes()?;
1233 let path = dest_path.as_ref();
1234 let file = fs
1235 .open_read_write_create_truncate(path)
1236 .map_err(DbError::Io)?;
1237 let mut store = FileStore::new(file);
1238 store.write_all_at(0, &bytes)?;
1239 store.truncate(bytes.len() as u64)?;
1240 store.sync()?;
1241 #[cfg(feature = "tracing")]
1242 tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
1243 Ok(())
1244 }
1245 pub fn compact_in_place(&mut self) -> Result<(), DbError> {
1246 self.compact_in_place_with_fsops(&StdFsOps)
1247 }
1248
1249 pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
1250 #[cfg(feature = "tracing")]
1251 let _span = tracing::info_span!("database_compact_in_place").entered();
1252 let bytes = self.compact_snapshot_bytes()?;
1256 let live_path = self.path.clone();
1257 let parent = live_path
1258 .parent()
1259 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1260
1261 let pid = std::process::id();
1263 let nanos = std::time::SystemTime::now()
1264 .duration_since(std::time::UNIX_EPOCH)
1265 .map(|d| d.as_nanos())
1266 .unwrap_or(0);
1267 let base = live_path
1268 .file_name()
1269 .and_then(|s| s.to_str())
1270 .unwrap_or("db.modelvault");
1271 let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
1272 let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
1273
1274 {
1276 let file = fs
1277 .open_read_write_create_new(&tmp_path)
1278 .map_err(DbError::Io)?;
1279 let mut store = FileStore::new(file);
1280 store.write_all_at(0, &bytes)?;
1281 store.truncate(bytes.len() as u64)?;
1282 store.sync()?;
1283 }
1284
1285 let _ = fs.remove_file(&bak_path);
1295 fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
1296 let replace_res = fs.rename(&tmp_path, &live_path);
1297 if let Err(e) = replace_res {
1298 let _ = fs.rename(&bak_path, &live_path);
1300 let _ = fs.remove_file(&tmp_path);
1302 return Err(DbError::Io(e));
1303 }
1304
1305 #[cfg(unix)]
1307 {
1308 if let Ok(dir_f) = fs.open_dir(parent) {
1312 let _ = dir_f.sync_all();
1313 }
1314 }
1315
1316 let _ = fs.remove_file(&bak_path);
1317
1318 let reopened = Database::open_with_options(live_path, OpenOptions::default())?;
1320 *self = reopened;
1321 #[cfg(feature = "tracing")]
1322 tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
1323 Ok(())
1324 }
1325
1326 pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1331 self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
1332 }
1333
1334 pub(crate) fn export_snapshot_to_path_with_fsops(
1335 &mut self,
1336 fs: &dyn FsOps,
1337 dest_path: impl AsRef<Path>,
1338 ) -> Result<(), DbError> {
1339 self.checkpoint()?;
1340 let dest_path = dest_path.as_ref();
1341 fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
1342 if let Ok(f) = fs.open_read(dest_path) {
1345 let _ = f.sync_all();
1346 }
1347 #[cfg(unix)]
1348 best_effort_fsync_parent_dir(fs, dest_path);
1349 Ok(())
1350 }
1351
1352 pub fn restore_snapshot_to_path(
1356 snapshot_path: impl AsRef<Path>,
1357 dest_path: impl AsRef<Path>,
1358 ) -> Result<(), DbError> {
1359 Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
1360 }
1361
1362 pub(crate) fn restore_snapshot_to_path_with_fsops(
1363 fs: &dyn FsOps,
1364 snapshot_path: impl AsRef<Path>,
1365 dest_path: impl AsRef<Path>,
1366 ) -> Result<(), DbError> {
1367 let snapshot_path = snapshot_path.as_ref();
1368 let dest_path = dest_path.as_ref();
1369 let parent = dest_path
1370 .parent()
1371 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1372
1373 let pid = std::process::id();
1374 let nanos = std::time::SystemTime::now()
1375 .duration_since(std::time::UNIX_EPOCH)
1376 .map(|d| d.as_nanos())
1377 .unwrap_or(0);
1378 let base = dest_path
1379 .file_name()
1380 .and_then(|s| s.to_str())
1381 .unwrap_or("db.modelvault");
1382 let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
1383 let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
1384
1385 fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
1387 if let Ok(f) = fs.open_read(&tmp_path) {
1388 let _ = f.sync_all();
1389 }
1390
1391 if dest_path.exists() {
1393 let _ = fs.remove_file(&bak_path);
1394 fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
1395 }
1396 let replace_res = fs.rename(&tmp_path, dest_path);
1397 if let Err(e) = replace_res {
1398 if bak_path.exists() {
1400 let _ = fs.rename(&bak_path, dest_path);
1401 }
1402 let _ = fs.remove_file(&tmp_path);
1403 return Err(DbError::Io(e));
1404 }
1405
1406 #[cfg(unix)]
1407 {
1408 if let Ok(dir_f) = fs.open_dir(parent) {
1409 let _ = dir_f.sync_all();
1410 }
1411 }
1412 let _ = fs.remove_file(&bak_path);
1413 Ok(())
1414 }
1415}
1416
1417pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
1418 db: &'a Database<S>,
1419 collection_id: CollectionId,
1420 _marker: PhantomData<T>,
1421}
1422
1423impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
1424 pub fn where_eq(
1425 &self,
1426 path: crate::schema::FieldPath,
1427 value: ScalarValue,
1428 ) -> QueryBuilder<'a, S, T> {
1429 QueryBuilder {
1430 db: self.db,
1431 collection_id: self.collection_id,
1432 predicate: Some(crate::query::Predicate::Eq { path, value }),
1433 limit: None,
1434 _marker: PhantomData,
1435 }
1436 }
1437
1438 pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1439 let q = crate::query::Query {
1440 collection: self.collection_id,
1441 predicate: None,
1442 limit: None,
1443 order_by: None,
1444 };
1445 let rows = self.db.query(&q)?;
1446 Ok(rows.into_iter().map(project_row::<T>).collect())
1447 }
1448}
1449
1450pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
1451 db: &'a Database<S>,
1452 collection_id: CollectionId,
1453 predicate: Option<crate::query::Predicate>,
1454 limit: Option<usize>,
1455 _marker: PhantomData<T>,
1456}
1457
1458impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
1459 pub fn limit(mut self, n: usize) -> Self {
1460 self.limit = Some(n);
1461 self
1462 }
1463
1464 pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1465 let q = crate::query::Query {
1466 collection: self.collection_id,
1467 predicate: self.predicate,
1468 limit: self.limit,
1469 order_by: None,
1470 };
1471 let rows = self.db.query(&q)?;
1472 Ok(rows.into_iter().map(project_row::<T>).collect())
1473 }
1474
1475 pub fn explain(self) -> Result<String, DbError> {
1476 let q = crate::query::Query {
1477 collection: self.collection_id,
1478 predicate: self.predicate,
1479 limit: self.limit,
1480 order_by: None,
1481 };
1482 self.db.explain_query(&q)
1483 }
1484}
1485
1486fn validate_subset_model<T: crate::schema::DbModel>(
1487 col: &crate::catalog::CollectionInfo,
1488) -> Result<(), DbError> {
1489 crate::schema_compat::validate_model_fields_against_catalog(
1490 col,
1491 T::primary_field(),
1492 &T::fields(),
1493 &T::indexes(),
1494 )
1495}
1496
1497pub fn row_subset_by_field_defs(
1499 row: &BTreeMap<String, RowValue>,
1500 wanted: &[FieldDef],
1501) -> BTreeMap<String, RowValue> {
1502 let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
1503 for f in wanted {
1504 let segs = &f.path.0;
1505 if segs.is_empty() {
1506 continue;
1507 }
1508 let Some(leaf) = row_value_at_path_segments(row, segs) else {
1509 continue;
1510 };
1511 let root = segs[0].to_string();
1512 if segs.len() == 1 {
1513 out.insert(root, leaf);
1514 } else {
1515 let nested = row_value_nested_object_path(&segs[1..], leaf);
1516 match out.get_mut(&root) {
1517 Some(existing) => merge_row_value_trees(existing, nested),
1518 None => {
1519 out.insert(root, nested);
1520 }
1521 }
1522 }
1523 }
1524 out
1525}
1526
1527fn row_value_at_path_segments(
1528 row: &BTreeMap<String, RowValue>,
1529 path: &[std::borrow::Cow<'static, str>],
1530) -> Option<RowValue> {
1531 if path.is_empty() {
1532 return None;
1533 }
1534 let mut cur = row.get(path[0].as_ref())?;
1535 for seg in path.iter().skip(1) {
1536 cur = match cur {
1537 RowValue::Object(m) => m.get(seg.as_ref())?,
1538 RowValue::None => return None,
1539 _ => return None,
1540 };
1541 }
1542 Some(cur.clone())
1543}
1544
1545fn row_value_nested_object_path(
1547 segments: &[std::borrow::Cow<'static, str>],
1548 leaf: RowValue,
1549) -> RowValue {
1550 debug_assert!(!segments.is_empty());
1551 if segments.len() == 1 {
1552 let mut m = BTreeMap::new();
1553 m.insert(segments[0].to_string(), leaf);
1554 RowValue::Object(m)
1555 } else {
1556 let mut m = BTreeMap::new();
1557 m.insert(
1558 segments[0].to_string(),
1559 row_value_nested_object_path(&segments[1..], leaf),
1560 );
1561 RowValue::Object(m)
1562 }
1563}
1564
1565fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
1566 match (&mut *into, from) {
1567 (RowValue::Object(m1), RowValue::Object(m2)) => {
1568 for (k, v2) in m2 {
1569 match m1.entry(k) {
1570 std::collections::btree_map::Entry::Vacant(e) => {
1571 e.insert(v2);
1572 }
1573 std::collections::btree_map::Entry::Occupied(mut e) => {
1574 merge_row_value_trees(e.get_mut(), v2);
1575 }
1576 }
1577 }
1578 }
1579 (slot, from) => *slot = from,
1580 }
1581}
1582
1583fn project_row<T: crate::schema::DbModel>(
1584 row: BTreeMap<String, RowValue>,
1585) -> BTreeMap<String, RowValue> {
1586 row_subset_by_field_defs(&row, &T::fields())
1587}
1588
1589pub(crate) fn scalar_at_path(
1590 row: &BTreeMap<String, RowValue>,
1591 path: &crate::schema::FieldPath,
1592) -> Option<ScalarValue> {
1593 let mut cur: Option<&RowValue> = None;
1594 for (i, seg) in path.0.iter().enumerate() {
1595 let key = seg.as_ref();
1596 cur = match (i, cur) {
1597 (0, _) => row.get(key),
1598 (_, Some(RowValue::Object(map))) => map.get(key),
1599 (_, Some(RowValue::None)) => return None,
1600 _ => return None,
1601 };
1602 }
1603 cur.and_then(|v| v.as_scalar())
1604}
1605
1606impl Database<FileStore> {
1607 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
1611 Self::open_with_options(path, crate::config::OpenOptions::default())
1612 }
1613
1614 pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
1616 Self::open_with_options(
1617 path,
1618 crate::config::OpenOptions {
1619 recovery: crate::config::RecoveryMode::Strict,
1620 mode: OpenMode::ReadOnly,
1621 },
1622 )
1623 }
1624
1625 pub fn open_with_options(
1627 path: impl AsRef<Path>,
1628 opts: crate::config::OpenOptions,
1629 ) -> Result<Self, DbError> {
1630 let path = path.as_ref().to_path_buf();
1631 let store = FileStore::open_locked(&path, opts.mode)?;
1632 Self::open_with_store(path, store, opts)
1633 }
1634}
1635
1636impl Database<VecStore> {
1637 pub fn open_in_memory() -> Result<Self, DbError> {
1639 Self::open_in_memory_with_options(crate::config::OpenOptions::default())
1640 }
1641
1642 pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
1644 Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
1645 }
1646
1647 pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
1649 Self::open_with_store(
1650 PathBuf::from(":memory:"),
1651 VecStore::from_vec(bytes),
1652 crate::config::OpenOptions::default(),
1653 )
1654 }
1655
1656 pub fn into_snapshot_bytes(self) -> Vec<u8> {
1658 self.store.into_inner()
1659 }
1660
1661 pub fn snapshot_bytes(&self) -> Vec<u8> {
1663 self.store.as_slice().to_vec()
1664 }
1665
1666 pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1668 Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
1669 }
1670
1671 pub(crate) fn export_snapshot_to_path_with_fsops(
1672 fs: &dyn FsOps,
1673 dest_path: impl AsRef<Path>,
1674 bytes: &[u8],
1675 ) -> Result<(), DbError> {
1676 fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
1677 Ok(())
1678 }
1679
1680 pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
1682 let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
1683 Self::from_snapshot_bytes(bytes)
1684 }
1685}
1686
1687#[cfg(test)]
1688mod scalar_at_path_tests {
1689 include!(concat!(
1690 env!("CARGO_MANIFEST_DIR"),
1691 "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
1692 ));
1693}
1694
1695#[cfg(test)]
1696mod tests {
1697 include!(concat!(
1698 env!("CARGO_MANIFEST_DIR"),
1699 "/tests/unit/src_db_mod_tests.rs"
1700 ));
1701}