1mod fs_ops;
7mod helpers;
8mod open;
9mod recover;
10mod replay;
11pub(crate) mod row_materialize;
12mod writer_registry;
13pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
14mod row_merge;
15pub(crate) mod row_paths;
16pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
17mod write;
18
19use std::collections::{BTreeMap, HashMap};
20use std::marker::PhantomData;
21use std::path::{Path, PathBuf};
22
23use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
24use crate::config::{OpenMode, OpenOptions};
25use crate::error::{DbError, FormatError, SchemaError, TransactionError};
26use crate::index::IndexState;
27use crate::index::{encode_index_payload, IndexEntry, IndexOp};
28use crate::record::{
29 encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
30 encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
31 OP_REPLACE,
32};
33use crate::schema::{classify_schema_update, SchemaChange};
34use crate::schema::{CollectionId, FieldDef, SchemaVersion};
35use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
36use crate::segments::writer::SegmentWriter;
37use crate::storage::{FileStore, Store, VecStore};
38use crate::validation;
39use crate::{checkpoint, publish};
40use crate::{MigrationPlan, MigrationStep};
41
42use self::fs_ops::{FsOps, StdFsOps};
43
44#[cfg(unix)]
46fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
47 let Some(parent) = dest_path.parent() else {
48 return;
49 };
50 let Ok(dir_f) = fs.open_dir(parent) else {
51 return;
52 };
53 let _ = dir_f.sync_all();
54}
55
56pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
57
58type PlannedInsert = (
59 Vec<u8>,
60 (Vec<u8>, BTreeMap<String, RowValue>),
61 Vec<IndexEntry>,
62 ScalarValue,
63);
64
65fn plan_insert_row(
66 catalog: &Catalog,
67 collection_id: CollectionId,
68 mut row: BTreeMap<String, RowValue>,
69) -> Result<PlannedInsert, DbError> {
70 let col =
71 catalog
72 .get(collection_id)
73 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
74 id: collection_id.0,
75 }))?;
76 let pk_name =
77 col.primary_field
78 .as_deref()
79 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
80 collection_id: collection_id.0,
81 }))?;
82 let pk_def = col
83 .fields
84 .iter()
85 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
86 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
87 name: pk_name.to_string(),
88 }))?;
89 let pk_ty = &pk_def.ty;
90 validation::ensure_pk_type_primitive(pk_ty)?;
91 let mut pk_path = vec![pk_name.to_string()];
92 let pk_cell = row
93 .get(pk_name)
94 .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
95 name: pk_name.to_string(),
96 }))?;
97 validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
98 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
101 if !has_multi_segment_schema {
102 validation::validate_top_level_row(&col.fields, pk_name, &row)?;
103 } else {
104 validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
105 }
106
107 let pk_val = row.remove(pk_name).unwrap();
109 let pk_scalar = pk_val.clone().into_scalar()?;
110
111 let non_pk_defs = if has_multi_segment_schema {
115 col.fields
116 .iter()
117 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
118 .collect::<Vec<_>>()
119 } else {
120 non_pk_defs_in_order(&col.fields, pk_name)
121 };
122 let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
123
124 let payload = if has_multi_segment_schema {
125 encode_record_payload_v3(
126 collection_id.0,
127 col.current_version.0,
128 &pk_scalar,
129 pk_ty,
130 &non_pk,
131 )?
132 } else {
133 encode_record_payload_v2(
134 collection_id.0,
135 col.current_version.0,
136 &pk_scalar,
137 pk_ty,
138 &non_pk,
139 )?
140 };
141
142 let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
143 full_map.insert(pk_name.to_string(), pk_val);
144 for (def, v) in &non_pk {
145 let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
146 if parts.len() == 1 {
147 full_map.insert(parts[0].clone(), v.clone());
148 } else {
149 debug_assert!(parts.len() >= 2);
150 row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
151 }
152 }
153 let mut index_entries: Vec<IndexEntry> = Vec::new();
154 for idx in &col.indexes {
155 let Some(v) = scalar_at_path(&full_map, &idx.path) else {
156 continue;
157 };
158 index_entries.push(IndexEntry {
159 collection_id: collection_id.0,
160 index_name: idx.name.clone(),
161 kind: idx.kind,
162 op: IndexOp::Insert,
163 index_key: v.canonical_key_bytes(),
164 pk_key: pk_scalar.canonical_key_bytes(),
165 });
166 }
167 let pk_key = pk_scalar.canonical_key_bytes();
168 Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
169}
170
171fn index_deletes_for_existing_row(
172 collection_id: CollectionId,
173 pk_scalar: &ScalarValue,
174 indexes: &[crate::schema::IndexDef],
175 existing_row: &BTreeMap<String, RowValue>,
176) -> Vec<IndexEntry> {
177 let mut out = Vec::new();
178 for idx in indexes {
179 let Some(v) = scalar_at_path(existing_row, &idx.path) else {
180 continue;
181 };
182 out.push(IndexEntry {
183 collection_id: collection_id.0,
184 index_name: idx.name.clone(),
185 kind: idx.kind,
186 op: IndexOp::Delete,
187 index_key: v.canonical_key_bytes(),
188 pk_key: pk_scalar.canonical_key_bytes(),
189 });
190 }
191 out
192}
193
194pub(crate) struct TxnStaging {
196 pub(crate) txn_id: u64,
197 pub(crate) shadow_catalog: Catalog,
198 pub(crate) shadow_latest: LatestMap,
199 pub(crate) shadow_indexes: IndexState,
200 pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
201}
202
203pub struct Database<S: Store = FileStore> {
205 path: PathBuf,
207 store: S,
208 catalog: Catalog,
210 segment_start: u64,
212 format_minor: u16,
214 latest: LatestMap,
216 indexes: IndexState,
218 txn_seq: u64,
220 txn_staging: Option<TxnStaging>,
222 #[allow(dead_code)]
224 writer_registry: Option<writer_registry::WriterRegistryGuard>,
225 #[cfg(test)]
227 #[doc(hidden)]
228 #[allow(clippy::type_complexity)]
229 pub(crate) test_poison_planned_replace_row:
230 Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
231 #[cfg(test)]
233 #[doc(hidden)]
234 pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
235}
236
237impl<S: Store> Database<S> {
238 fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
239 let mut out = Database::<VecStore>::open_in_memory()?;
240
241 let mut cols = self.catalog_for_read().collections();
243 cols.sort_by_key(|c| c.id.0);
244 for c in &cols {
245 let pk =
246 c.primary_field
247 .as_deref()
248 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
249 collection_id: c.id.0,
250 }))?;
251 let (new_id, _v1) = out.register_collection_with_indexes(
252 &c.name,
253 c.fields.clone(),
254 c.indexes.clone(),
255 pk,
256 )?;
257 for _ in 2..=c.current_version.0 {
259 let _ = out.register_schema_version_with_indexes_force(
260 new_id,
261 c.fields.clone(),
262 c.indexes.clone(),
263 )?;
264 }
265 }
266
267 for ((cid, _), row) in self.latest_for_read().iter() {
269 let collection_id = CollectionId(*cid);
270 out.insert(collection_id, row.clone())?;
271 }
272
273 Ok(out.into_snapshot_bytes())
274 }
275
276 pub(crate) fn open_with_store(
277 path: PathBuf,
278 store: S,
279 opts: OpenOptions,
280 ) -> Result<Self, DbError> {
281 open::open_with_store(path, store, opts)
282 }
283
284 fn next_txn_id(&mut self) -> u64 {
285 self.txn_seq = self.txn_seq.saturating_add(1);
286 self.txn_seq
287 }
288
289 #[inline]
290 fn commit_write_batch(
291 &mut self,
292 txn_id: u64,
293 body: &[(crate::segments::header::SegmentType, &[u8])],
294 ) -> Result<(), DbError> {
295 write::commit_write_txn_v6(
296 &mut self.store,
297 self.segment_start,
298 &mut self.format_minor,
299 txn_id,
300 body,
301 )
302 }
303
304 #[inline]
305 fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
306 self.catalog.apply_record(wire)
307 }
308
309 pub fn transaction<R>(
313 &mut self,
314 f: impl FnOnce(&mut Self) -> Result<R, DbError>,
315 ) -> Result<R, DbError> {
316 self.begin_transaction()?;
317 match f(self) {
318 Ok(v) => match self.commit_transaction() {
319 Ok(()) => Ok(v),
320 Err(e) => {
321 self.rollback_transaction();
322 Err(e)
323 }
324 },
325 Err(e) => {
326 self.rollback_transaction();
327 Err(e)
328 }
329 }
330 }
331
332 pub fn begin_transaction(&mut self) -> Result<(), DbError> {
335 if self.txn_staging.is_some() {
336 return Err(DbError::Transaction(TransactionError::NestedTransaction));
337 }
338 let tid = self.next_txn_id();
339 self.txn_staging = Some(TxnStaging {
340 txn_id: tid,
341 shadow_catalog: self.catalog.clone(),
342 shadow_latest: self.latest.clone(),
343 shadow_indexes: self.indexes.clone(),
344 pending: Vec::new(),
345 });
346 Ok(())
347 }
348
349 pub fn commit_transaction(&mut self) -> Result<(), DbError> {
351 self.commit_txn_staging()
352 }
353
354 pub fn rollback_transaction(&mut self) {
356 self.txn_staging = None;
357 }
358
359 fn commit_txn_staging(&mut self) -> Result<(), DbError> {
360 let Some(st) = self.txn_staging.take() else {
361 return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
362 };
363 if st.pending.is_empty() {
364 self.catalog = st.shadow_catalog;
365 self.latest = st.shadow_latest;
366 self.indexes = st.shadow_indexes;
367 return Ok(());
368 }
369 let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
370 st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
371 self.commit_write_batch(st.txn_id, &batch)?;
372 self.catalog = st.shadow_catalog;
373 self.latest = st.shadow_latest;
374 self.indexes = st.shadow_indexes;
375 Ok(())
376 }
377
378 fn catalog_for_read(&self) -> &Catalog {
379 if let Some(ref st) = self.txn_staging {
380 &st.shadow_catalog
381 } else {
382 &self.catalog
383 }
384 }
385
386 fn indexes_for_read(&self) -> &IndexState {
387 if let Some(ref st) = self.txn_staging {
388 &st.shadow_indexes
389 } else {
390 &self.indexes
391 }
392 }
393
394 fn latest_for_read(&self) -> &LatestMap {
395 if let Some(ref st) = self.txn_staging {
396 &st.shadow_latest
397 } else {
398 &self.latest
399 }
400 }
401
402 pub fn path(&self) -> &Path {
404 &self.path
405 }
406
407 pub fn catalog(&self) -> &Catalog {
409 self.catalog_for_read()
410 }
411
412 pub fn collection_names(&self) -> Vec<String> {
414 self.catalog_for_read().collection_names()
415 }
416
417 pub fn index_state(&self) -> &IndexState {
419 self.indexes_for_read()
420 }
421
422 pub fn query(
424 &self,
425 q: &crate::query::Query,
426 ) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
427 crate::query::execute_query(
428 self.catalog_for_read(),
429 self.indexes_for_read(),
430 self.latest_for_read(),
431 q,
432 )
433 }
434
435 pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
437 crate::query::explain_query(self.catalog_for_read(), q)
438 }
439
440 pub fn query_iter(
445 &self,
446 q: &crate::query::Query,
447 ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
448 crate::query::execute_query_iter_with_spill_path(
449 self.catalog_for_read(),
450 self.indexes_for_read(),
451 self.latest_for_read(),
452 q,
453 Some(self.path.as_path()),
454 )
455 }
456
457 pub fn register_model<T: crate::schema::DbModel>(
459 &mut self,
460 ) -> Result<(CollectionId, SchemaVersion), DbError> {
461 self.register_collection_with_indexes(
462 T::collection_name(),
463 T::fields(),
464 T::indexes(),
465 T::primary_field(),
466 )
467 }
468
469 pub fn collection<'a, T: crate::schema::DbModel>(
471 &'a self,
472 ) -> Result<Collection<'a, S, T>, DbError> {
473 let cid = self.collection_id_named(T::collection_name())?;
474 let col = self
475 .catalog_for_read()
476 .get(cid)
477 .expect("collection id from name lookup must exist in catalog");
478 validate_subset_model::<T>(col)?;
479 Ok(Collection {
480 db: self,
481 collection_id: cid,
482 _marker: PhantomData,
483 })
484 }
485
486 pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
490 self.catalog_for_read()
491 .lookup_name(name)
492 .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
493 name: name.trim().to_string(),
494 }))
495 }
496
497 pub fn register_collection(
502 &mut self,
503 name: &str,
504 fields: Vec<FieldDef>,
505 primary_field: &str,
506 ) -> Result<(CollectionId, SchemaVersion), DbError> {
507 self.register_collection_with_indexes(name, fields, vec![], primary_field)
508 }
509
510 pub fn register_collection_with_indexes(
511 &mut self,
512 name: &str,
513 fields: Vec<FieldDef>,
514 indexes: Vec<crate::schema::IndexDef>,
515 primary_field: &str,
516 ) -> Result<(CollectionId, SchemaVersion), DbError> {
517 let name = helpers::normalize_collection_name(name)?;
518 let pk = primary_field.trim();
519 if pk.is_empty() {
520 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
521 }
522 if !Catalog::has_top_level_field(&fields, pk) {
523 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
524 name: pk.to_string(),
525 }));
526 }
527 if let Some(st) = &mut self.txn_staging {
528 let id = st.shadow_catalog.next_collection_id().0;
529 let wire = CatalogRecordWire::CreateCollection {
530 collection_id: id,
531 name: name.clone(),
532 schema_version: 1,
533 fields,
534 indexes,
535 primary_field: Some(pk.to_string()),
536 };
537 let payload = encode_catalog_payload(&wire);
538 st.shadow_catalog.apply_record(wire)?;
539 st.pending
540 .push((crate::segments::header::SegmentType::Schema, payload));
541 return Ok((CollectionId(id), SchemaVersion(1)));
542 }
543 let id = self.catalog.next_collection_id().0;
544 let wire = CatalogRecordWire::CreateCollection {
545 collection_id: id,
546 name: name.clone(),
547 schema_version: 1,
548 fields,
549 indexes,
550 primary_field: Some(pk.to_string()),
551 };
552 let payload = encode_catalog_payload(&wire);
553 let tid = self.next_txn_id();
554 self.commit_write_batch(
555 tid,
556 &[(
557 crate::segments::header::SegmentType::Schema,
558 payload.as_slice(),
559 )],
560 )?;
561 self.apply_catalog_record(wire)?;
562 Ok((CollectionId(id), SchemaVersion(1)))
563 }
564
565 pub fn register_schema_version(
569 &mut self,
570 id: CollectionId,
571 fields: Vec<FieldDef>,
572 ) -> Result<SchemaVersion, DbError> {
573 self.register_schema_version_with_indexes(id, fields, vec![])
574 }
575
576 pub fn register_schema_version_with_indexes(
577 &mut self,
578 id: CollectionId,
579 fields: Vec<FieldDef>,
580 indexes: Vec<crate::schema::IndexDef>,
581 ) -> Result<SchemaVersion, DbError> {
582 let current = self
583 .catalog_for_read()
584 .get(id)
585 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
586 match classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)? {
588 SchemaChange::Safe => {}
589 SchemaChange::NeedsMigration { reason, .. } => {
590 return Err(DbError::Schema(SchemaError::MigrationRequired {
591 message: reason,
592 }));
593 }
594 SchemaChange::Breaking { reason } => {
595 return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
596 message: reason,
597 }));
598 }
599 }
600 let next_v = current
601 .current_version
602 .0
603 .checked_add(1)
604 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
605 let wire = CatalogRecordWire::NewSchemaVersion {
606 collection_id: id.0,
607 schema_version: next_v,
608 fields,
609 indexes,
610 };
611 let payload = encode_catalog_payload(&wire);
612 if let Some(st) = &mut self.txn_staging {
613 st.shadow_catalog.apply_record(wire.clone())?;
614 st.pending
615 .push((crate::segments::header::SegmentType::Schema, payload));
616 return Ok(SchemaVersion(next_v));
617 }
618 let tid = self.next_txn_id();
619 self.commit_write_batch(
620 tid,
621 &[(
622 crate::segments::header::SegmentType::Schema,
623 payload.as_slice(),
624 )],
625 )?;
626 self.apply_catalog_record(wire)?;
627 Ok(SchemaVersion(next_v))
628 }
629
630 pub fn plan_schema_version_with_indexes(
632 &self,
633 id: CollectionId,
634 fields: Vec<FieldDef>,
635 indexes: Vec<crate::schema::IndexDef>,
636 ) -> Result<MigrationPlan, DbError> {
637 let current = self
638 .catalog_for_read()
639 .get(id)
640 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
641 let change = classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)?;
643 let mut steps = Vec::new();
644 match &change {
645 SchemaChange::Safe => {}
646 SchemaChange::Breaking { .. } => {}
647 SchemaChange::NeedsMigration {
648 reason,
649 backfill_top_level_field,
650 backfill_field_path,
651 } => {
652 if let Some(field) = backfill_top_level_field {
653 steps.push(MigrationStep::BackfillTopLevelField {
654 field: field.clone(),
655 });
656 } else if let Some(path) = backfill_field_path {
657 steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
658 } else if reason.contains("unique index") {
659 steps.push(MigrationStep::RebuildIndexes);
660 }
661 }
662 }
663 Ok(MigrationPlan { change, steps })
664 }
665
666 pub fn backfill_top_level_field_with_value(
670 &mut self,
671 collection_id: CollectionId,
672 field: &str,
673 value: RowValue,
674 ) -> Result<(), DbError> {
675 let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
676 self.backfill_field_at_path_with_value(collection_id, &path, value)
677 }
678
679 pub fn backfill_field_at_path_with_value(
681 &mut self,
682 collection_id: CollectionId,
683 path: &crate::schema::FieldPath,
684 value: RowValue,
685 ) -> Result<(), DbError> {
686 let col = self
687 .catalog_for_read()
688 .get(collection_id)
689 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
690 id: collection_id.0,
691 }))?;
692 let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
693 DbError::Schema(SchemaError::RowUnknownField {
694 name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
695 })
696 })?;
697
698 let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
699 for ((cid, _), row) in self.latest_for_read().iter() {
700 if *cid != collection_id.0 {
701 continue;
702 }
703 rows.push(row.clone());
704 }
705
706 self.transaction(|db| {
707 for mut row in rows {
708 if row_value_at_path_segments(&row, &path.0).is_some() {
709 continue;
710 }
711 crate::record::insert_value_at_path(&mut row, path, value.clone())?;
712 db.insert(collection_id, row)?;
713 }
714 Ok(())
715 })
716 }
717
718 pub fn rebuild_indexes_for_collection(
720 &mut self,
721 collection_id: CollectionId,
722 ) -> Result<(), DbError> {
723 let col = self
724 .catalog_for_read()
725 .get(collection_id)
726 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
727 id: collection_id.0,
728 }))?;
729 let pk_name =
730 col.primary_field
731 .as_deref()
732 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
733 collection_id: collection_id.0,
734 }))?;
735 let pk_def = col
736 .fields
737 .iter()
738 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
739 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
740 name: pk_name.to_string(),
741 }))?;
742
743 let mut entries: Vec<IndexEntry> = Vec::new();
744 for ((cid, _), row) in self.latest_for_read().iter() {
745 if *cid != collection_id.0 {
746 continue;
747 }
748 let Some(pk_cell) = row.get(pk_name) else {
749 continue;
750 };
751 let pk_scalar = pk_cell.clone().into_scalar()?;
752 if !pk_scalar.ty_matches(&pk_def.ty) {
753 continue;
754 }
755 for idx in &col.indexes {
756 let Some(v) = scalar_at_path(row, &idx.path) else {
757 continue;
758 };
759 entries.push(IndexEntry {
760 collection_id: collection_id.0,
761 index_name: idx.name.clone(),
762 kind: idx.kind,
763 op: IndexOp::Insert,
764 index_key: v.canonical_key_bytes(),
765 pk_key: pk_scalar.canonical_key_bytes(),
766 });
767 }
768 }
769
770 self.transaction(|db| {
771 if entries.is_empty() {
772 return Ok(());
773 }
774 let st = db
777 .txn_staging
778 .as_mut()
779 .expect("transaction staging must be active");
780 let b = encode_index_payload(&entries);
781 st.pending
782 .push((crate::segments::header::SegmentType::Index, b));
783 for e in entries {
784 st.shadow_indexes.apply(e)?;
785 }
786 Ok(())
787 })
788 }
789
790 pub fn register_schema_version_with_indexes_force(
795 &mut self,
796 id: CollectionId,
797 fields: Vec<FieldDef>,
798 indexes: Vec<crate::schema::IndexDef>,
799 ) -> Result<SchemaVersion, DbError> {
800 let current = self
801 .catalog_for_read()
802 .get(id)
803 .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
804 let next_v = current
805 .current_version
806 .0
807 .checked_add(1)
808 .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
809 let wire = CatalogRecordWire::NewSchemaVersion {
810 collection_id: id.0,
811 schema_version: next_v,
812 fields,
813 indexes,
814 };
815 let payload = encode_catalog_payload(&wire);
816 if let Some(st) = &mut self.txn_staging {
817 st.shadow_catalog.apply_record(wire.clone())?;
818 st.pending
819 .push((crate::segments::header::SegmentType::Schema, payload));
820 return Ok(SchemaVersion(next_v));
821 }
822 let tid = self.next_txn_id();
823 self.commit_write_batch(
824 tid,
825 &[(
826 crate::segments::header::SegmentType::Schema,
827 payload.as_slice(),
828 )],
829 )?;
830 self.apply_catalog_record(wire)?;
831 Ok(SchemaVersion(next_v))
832 }
833
834 pub fn insert(
839 &mut self,
840 collection_id: CollectionId,
841 row: BTreeMap<String, RowValue>,
842 ) -> Result<(), DbError> {
843 write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
844 let (mut payload, full, mut index_entries, pk_scalar) =
845 plan_insert_row(self.catalog_for_read(), collection_id, row)?;
846 #[cfg(test)]
847 let mut full = full;
848 let existing = self
849 .latest_for_read()
850 .get(&(collection_id.0, full.0.clone()))
851 .cloned();
852 if existing.is_some() {
853 #[cfg(test)]
854 if let Some(poison) = self.test_poison_planned_replace_row.take() {
855 poison(collection_id, &mut full.1);
856 }
857 let col = self
859 .catalog_for_read()
860 .get(collection_id)
861 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
862 id: collection_id.0,
863 }))?;
864 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
865 let pk_name =
866 col.primary_field
867 .as_deref()
868 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
869 collection_id: collection_id.0,
870 }))?;
871 let pk_def = col
872 .fields
873 .iter()
874 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
875 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
876 name: pk_name.to_string(),
877 }))?;
878
879 let non_pk_defs = if has_multi_segment_schema {
880 col.fields
881 .iter()
882 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
883 .collect::<Vec<_>>()
884 } else {
885 non_pk_defs_in_order(&col.fields, pk_name)
886 };
887 let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
888 for def in &non_pk_defs {
889 let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
890 non_pk.push(((*def).clone(), v));
891 }
892 payload = (if has_multi_segment_schema {
893 encode_record_payload_v3_op(
894 collection_id.0,
895 col.current_version.0,
896 OP_REPLACE,
897 &pk_scalar,
898 &pk_def.ty,
899 &non_pk,
900 )
901 } else {
902 encode_record_payload_v2_op(
903 collection_id.0,
904 col.current_version.0,
905 OP_REPLACE,
906 &pk_scalar,
907 &pk_def.ty,
908 &non_pk,
909 )
910 })?;
911 if let Some(ref old_row) = existing {
913 let mut deletes = index_deletes_for_existing_row(
914 collection_id,
915 &pk_scalar,
916 &col.indexes,
917 old_row,
918 );
919 deletes.append(&mut index_entries);
920 index_entries = deletes;
921 }
922 }
923 for e in &index_entries {
924 if e.kind != crate::schema::IndexKind::Unique {
925 continue;
926 }
927 let Some(existing) =
928 self.indexes_for_read()
929 .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
930 else {
931 continue;
932 };
933 if e.op != IndexOp::Insert {
934 continue;
935 }
936 if existing == e.pk_key.as_slice() {
937 continue;
938 }
939 return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
940 }
941 if let Some(st) = &mut self.txn_staging {
942 if !index_entries.is_empty() {
943 let b = encode_index_payload(&index_entries);
944 st.pending
945 .push((crate::segments::header::SegmentType::Index, b));
946 }
947 st.pending.push((
948 crate::segments::header::SegmentType::Record,
949 payload.clone(),
950 ));
951 st.shadow_latest
952 .insert((collection_id.0, full.0.clone()), full.1.clone());
953 for e in index_entries {
954 st.shadow_indexes.apply(e)?;
955 }
956 return Ok(());
957 }
958 let tid = self.next_txn_id();
959 let index_bytes = if index_entries.is_empty() {
960 None
961 } else {
962 Some(encode_index_payload(&index_entries))
963 };
964 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
965 if let Some(ref b) = index_bytes {
966 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
967 }
968 batch.push((
969 crate::segments::header::SegmentType::Record,
970 payload.as_slice(),
971 ));
972 self.commit_write_batch(tid, &batch)?;
973 self.latest.insert((collection_id.0, full.0), full.1);
974 for e in index_entries {
975 self.indexes.apply(e)?;
976 }
977 Ok(())
978 }
979
980 pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
982 write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
983 let col = self
984 .catalog_for_read()
985 .get(collection_id)
986 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
987 id: collection_id.0,
988 }))?;
989 let pk_name =
990 col.primary_field
991 .as_deref()
992 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
993 collection_id: collection_id.0,
994 }))?;
995 let pk_def = col
996 .fields
997 .iter()
998 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
999 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1000 name: pk_name.to_string(),
1001 }))?;
1002 if !pk.ty_matches(&pk_def.ty) {
1003 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1004 }
1005 let pk_key = pk.canonical_key_bytes();
1006 let existing = self
1007 .latest_for_read()
1008 .get(&(collection_id.0, pk_key.clone()))
1009 .cloned();
1010 let Some(old_row) = existing else {
1011 return Ok(());
1012 };
1013 let indexes = col.indexes.clone();
1014 let schema_ver = col.current_version.0;
1015 let pk_ty = pk_def.ty.clone();
1016 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
1017
1018 let mut index_entries =
1019 index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
1020 #[cfg(not(test))]
1021 let pk_for_record = pk.clone();
1022 #[cfg(test)]
1023 let pk_for_record = {
1024 let mut p = pk.clone();
1025 if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
1026 p = poison(p);
1027 }
1028 p
1029 };
1030 let record_payload = (if has_multi_segment_schema {
1031 encode_record_payload_v3_op(
1032 collection_id.0,
1033 schema_ver,
1034 OP_DELETE,
1035 &pk_for_record,
1036 &pk_ty,
1037 &[],
1038 )
1039 } else {
1040 encode_record_payload_v2_op(
1041 collection_id.0,
1042 schema_ver,
1043 OP_DELETE,
1044 &pk_for_record,
1045 &pk_ty,
1046 &[],
1047 )
1048 })?;
1049
1050 if let Some(st) = &mut self.txn_staging {
1051 if !index_entries.is_empty() {
1052 let b = encode_index_payload(&index_entries);
1053 st.pending
1054 .push((crate::segments::header::SegmentType::Index, b));
1055 }
1056 st.pending.push((
1057 crate::segments::header::SegmentType::Record,
1058 record_payload.clone(),
1059 ));
1060 st.shadow_latest.remove(&(collection_id.0, pk_key));
1061 for e in index_entries.drain(..) {
1062 st.shadow_indexes.apply(e)?;
1063 }
1064 return Ok(());
1065 }
1066
1067 let tid = self.next_txn_id();
1068 let index_bytes = if index_entries.is_empty() {
1069 None
1070 } else {
1071 Some(encode_index_payload(&index_entries))
1072 };
1073 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
1074 if let Some(ref b) = index_bytes {
1075 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
1076 }
1077 batch.push((
1078 crate::segments::header::SegmentType::Record,
1079 record_payload.as_slice(),
1080 ));
1081 self.commit_write_batch(tid, &batch)?;
1082 self.latest.remove(&(collection_id.0, pk_key));
1083 for e in index_entries {
1084 self.indexes.apply(e)?;
1085 }
1086 Ok(())
1087 }
1088
1089 pub fn get(
1093 &self,
1094 collection_id: CollectionId,
1095 pk: &ScalarValue,
1096 ) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
1097 let col = self
1098 .catalog_for_read()
1099 .get(collection_id)
1100 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1101 id: collection_id.0,
1102 }))?;
1103 let pk_name =
1104 col.primary_field
1105 .as_deref()
1106 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
1107 collection_id: collection_id.0,
1108 }))?;
1109 let pk_ty = col
1110 .fields
1111 .iter()
1112 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
1113 .map(|f| &f.ty)
1114 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1115 name: pk_name.to_string(),
1116 }))?;
1117 if !pk.ty_matches(pk_ty) {
1118 return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
1119 collection_id: collection_id.0,
1120 }));
1121 }
1122 let key = (collection_id.0, pk.canonical_key_bytes());
1123 Ok(self.latest_for_read().get(&key).cloned())
1124 }
1125
1126 pub fn checkpoint(&mut self) -> Result<(), DbError> {
1132 #[cfg(feature = "tracing")]
1133 let _span = tracing::info_span!("database_checkpoint").entered();
1134 if self.txn_staging.is_some() {
1135 return Err(DbError::Transaction(TransactionError::NestedTransaction));
1136 }
1137
1138 write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
1139
1140 let mut cp = checkpoint::checkpoint_from_state(
1141 self.catalog_for_read(),
1142 self.latest_for_read(),
1143 self.indexes_for_read(),
1144 )?;
1145
1146 let file_len = self.store.len()?;
1147 let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
1148 let checkpoint_offset = writer.offset();
1149
1150 let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
1151 let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
1152 cp.replay_from_offset = replay_from;
1153 let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
1154
1155 let hdr = SegmentHeader {
1156 segment_type: SegmentType::Checkpoint,
1157 payload_len: 0,
1158 payload_crc32c: 0,
1159 };
1160 writer.append(hdr, &payload)?;
1161
1162 publish::append_manifest_and_publish_with_checkpoint(
1163 &mut self.store,
1164 self.segment_start,
1165 Some((checkpoint_offset, payload.len() as u32)),
1166 )?;
1167 self.store.sync()?;
1168 #[cfg(feature = "tracing")]
1169 tracing::info!(
1170 checkpoint_offset,
1171 replay_from,
1172 payload_bytes = payload.len(),
1173 "database_checkpoint_ok"
1174 );
1175 Ok(())
1176 }
1177
1178 #[cfg(test)]
1180 #[doc(hidden)]
1181 pub(crate) fn test_arm_replace_encode_poison_once(
1182 &mut self,
1183 poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
1184 ) {
1185 self.test_poison_planned_replace_row = Some(poison);
1186 }
1187
1188 #[cfg(test)]
1189 #[doc(hidden)]
1190 pub(crate) fn test_arm_delete_encode_poison_once(
1191 &mut self,
1192 poison: fn(ScalarValue) -> ScalarValue,
1193 ) {
1194 self.test_poison_delete_encode_scalar = Some(poison);
1195 }
1196
1197 #[cfg(test)]
1199 #[doc(hidden)]
1200 pub(crate) fn test_write_latest_cell_unchecked(
1201 &mut self,
1202 collection_id: CollectionId,
1203 pk: &ScalarValue,
1204 field: &str,
1205 value: RowValue,
1206 ) {
1207 let pk_key = pk.canonical_key_bytes();
1208 let row = self
1209 .latest
1210 .get_mut(&(collection_id.0, pk_key))
1211 .expect("test_write_latest_cell_unchecked: unknown row key");
1212 row.insert(field.to_string(), value);
1213 }
1214}
1215
1216impl Database<FileStore> {
1217 pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1221 self.compact_to_with_fsops(&StdFsOps, dest_path)
1222 }
1223
1224 pub(crate) fn compact_to_with_fsops(
1225 &self,
1226 fs: &dyn FsOps,
1227 dest_path: impl AsRef<Path>,
1228 ) -> Result<(), DbError> {
1229 #[cfg(feature = "tracing")]
1230 let _span = tracing::info_span!(
1231 "database_compact_to",
1232 dest = %dest_path.as_ref().display()
1233 )
1234 .entered();
1235 let bytes = self.compact_snapshot_bytes()?;
1236 let path = dest_path.as_ref();
1237 let file = fs
1238 .open_read_write_create_truncate(path)
1239 .map_err(DbError::Io)?;
1240 let mut store = FileStore::new(file);
1241 store.write_all_at(0, &bytes)?;
1242 store.truncate(bytes.len() as u64)?;
1243 store.sync()?;
1244 #[cfg(feature = "tracing")]
1245 tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
1246 Ok(())
1247 }
1248 pub fn compact_in_place(&mut self) -> Result<(), DbError> {
1249 self.compact_in_place_with_fsops(&StdFsOps)
1250 }
1251
1252 pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
1253 #[cfg(feature = "tracing")]
1254 let _span = tracing::info_span!("database_compact_in_place").entered();
1255 let bytes = self.compact_snapshot_bytes()?;
1259 let live_path = self.path.clone();
1260 let parent = live_path
1261 .parent()
1262 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1263
1264 let pid = std::process::id();
1266 let nanos = std::time::SystemTime::now()
1267 .duration_since(std::time::UNIX_EPOCH)
1268 .map(|d| d.as_nanos())
1269 .unwrap_or(0);
1270 let base = live_path
1271 .file_name()
1272 .and_then(|s| s.to_str())
1273 .unwrap_or("db.modelvault");
1274 let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
1275 let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
1276
1277 {
1279 let file = fs
1280 .open_read_write_create_new(&tmp_path)
1281 .map_err(DbError::Io)?;
1282 let mut store = FileStore::new(file);
1283 store.write_all_at(0, &bytes)?;
1284 store.truncate(bytes.len() as u64)?;
1285 store.sync()?;
1286 }
1287
1288 let _ = fs.remove_file(&bak_path);
1298 fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
1299 let replace_res = fs.rename(&tmp_path, &live_path);
1300 if let Err(e) = replace_res {
1301 let _ = fs.rename(&bak_path, &live_path);
1303 let _ = fs.remove_file(&tmp_path);
1305 return Err(DbError::Io(e));
1306 }
1307
1308 #[cfg(unix)]
1310 {
1311 if let Ok(dir_f) = fs.open_dir(parent) {
1315 let _ = dir_f.sync_all();
1316 }
1317 }
1318
1319 let _ = fs.remove_file(&bak_path);
1320
1321 self.writer_registry = None;
1323 let reopened = match (|| {
1324 let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
1325 Self::open_with_store(live_path.clone(), store, OpenOptions::default())
1326 })() {
1327 Ok(db) => db,
1328 Err(e) => {
1329 let _ = fs.rename(&bak_path, &live_path);
1330 self.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1331 live_path.clone(),
1332 )?);
1333 return Err(e);
1334 }
1335 };
1336 let mut reopened = reopened;
1337 reopened.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1338 live_path.clone(),
1339 )?);
1340 *self = reopened;
1341 #[cfg(feature = "tracing")]
1342 tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
1343 Ok(())
1344 }
1345
1346 pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1351 self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
1352 }
1353
1354 pub(crate) fn export_snapshot_to_path_with_fsops(
1355 &mut self,
1356 fs: &dyn FsOps,
1357 dest_path: impl AsRef<Path>,
1358 ) -> Result<(), DbError> {
1359 self.checkpoint()?;
1360 let dest_path = dest_path.as_ref();
1361 fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
1362 if let Ok(f) = fs.open_read(dest_path) {
1365 let _ = f.sync_all();
1366 }
1367 #[cfg(unix)]
1368 best_effort_fsync_parent_dir(fs, dest_path);
1369 Ok(())
1370 }
1371
1372 pub fn restore_snapshot_to_path(
1376 snapshot_path: impl AsRef<Path>,
1377 dest_path: impl AsRef<Path>,
1378 ) -> Result<(), DbError> {
1379 Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
1380 }
1381
1382 pub(crate) fn restore_snapshot_to_path_with_fsops(
1383 fs: &dyn FsOps,
1384 snapshot_path: impl AsRef<Path>,
1385 dest_path: impl AsRef<Path>,
1386 ) -> Result<(), DbError> {
1387 let snapshot_path = snapshot_path.as_ref();
1388 let dest_path = dest_path.as_ref();
1389 let parent = dest_path
1390 .parent()
1391 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1392
1393 let pid = std::process::id();
1394 let nanos = std::time::SystemTime::now()
1395 .duration_since(std::time::UNIX_EPOCH)
1396 .map(|d| d.as_nanos())
1397 .unwrap_or(0);
1398 let base = dest_path
1399 .file_name()
1400 .and_then(|s| s.to_str())
1401 .unwrap_or("db.modelvault");
1402 let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
1403 let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
1404
1405 fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
1407 if let Ok(f) = fs.open_read(&tmp_path) {
1408 let _ = f.sync_all();
1409 }
1410
1411 if dest_path.exists() {
1413 let _ = fs.remove_file(&bak_path);
1414 fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
1415 }
1416 let replace_res = fs.rename(&tmp_path, dest_path);
1417 if let Err(e) = replace_res {
1418 if bak_path.exists() {
1420 let _ = fs.rename(&bak_path, dest_path);
1421 }
1422 let _ = fs.remove_file(&tmp_path);
1423 return Err(DbError::Io(e));
1424 }
1425
1426 #[cfg(unix)]
1427 {
1428 if let Ok(dir_f) = fs.open_dir(parent) {
1429 let _ = dir_f.sync_all();
1430 }
1431 }
1432 let _ = fs.remove_file(&bak_path);
1433 Ok(())
1434 }
1435}
1436
1437pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
1438 db: &'a Database<S>,
1439 collection_id: CollectionId,
1440 _marker: PhantomData<T>,
1441}
1442
1443impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
1444 pub fn where_eq(
1445 &self,
1446 path: crate::schema::FieldPath,
1447 value: ScalarValue,
1448 ) -> QueryBuilder<'a, S, T> {
1449 QueryBuilder {
1450 db: self.db,
1451 collection_id: self.collection_id,
1452 predicate: Some(crate::query::Predicate::Eq { path, value }),
1453 limit: None,
1454 _marker: PhantomData,
1455 }
1456 }
1457
1458 pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1459 let q = crate::query::Query {
1460 collection: self.collection_id,
1461 predicate: None,
1462 limit: None,
1463 order_by: None,
1464 };
1465 let rows = self.db.query(&q)?;
1466 Ok(rows.into_iter().map(project_row::<T>).collect())
1467 }
1468}
1469
1470pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
1471 db: &'a Database<S>,
1472 collection_id: CollectionId,
1473 predicate: Option<crate::query::Predicate>,
1474 limit: Option<usize>,
1475 _marker: PhantomData<T>,
1476}
1477
1478impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
1479 pub fn limit(mut self, n: usize) -> Self {
1480 self.limit = Some(n);
1481 self
1482 }
1483
1484 pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1485 let q = crate::query::Query {
1486 collection: self.collection_id,
1487 predicate: self.predicate,
1488 limit: self.limit,
1489 order_by: None,
1490 };
1491 let rows = self.db.query(&q)?;
1492 Ok(rows.into_iter().map(project_row::<T>).collect())
1493 }
1494
1495 pub fn explain(self) -> Result<String, DbError> {
1496 let q = crate::query::Query {
1497 collection: self.collection_id,
1498 predicate: self.predicate,
1499 limit: self.limit,
1500 order_by: None,
1501 };
1502 self.db.explain_query(&q)
1503 }
1504}
1505
1506fn validate_subset_model<T: crate::schema::DbModel>(
1507 col: &crate::catalog::CollectionInfo,
1508) -> Result<(), DbError> {
1509 crate::schema_compat::validate_model_fields_against_catalog(
1510 col,
1511 T::primary_field(),
1512 &T::fields(),
1513 &T::indexes(),
1514 )
1515}
1516
1517pub fn row_subset_by_field_defs(
1519 row: &BTreeMap<String, RowValue>,
1520 wanted: &[FieldDef],
1521) -> BTreeMap<String, RowValue> {
1522 let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
1523 for f in wanted {
1524 let segs = &f.path.0;
1525 if segs.is_empty() {
1526 continue;
1527 }
1528 let Some(leaf) = row_value_at_path_segments(row, segs) else {
1529 continue;
1530 };
1531 let root = segs[0].to_string();
1532 if segs.len() == 1 {
1533 out.insert(root, leaf);
1534 } else {
1535 let nested = row_value_nested_object_path(&segs[1..], leaf);
1536 match out.get_mut(&root) {
1537 Some(existing) => merge_row_value_trees(existing, nested),
1538 None => {
1539 out.insert(root, nested);
1540 }
1541 }
1542 }
1543 }
1544 out
1545}
1546
1547fn row_value_at_path_segments(
1548 row: &BTreeMap<String, RowValue>,
1549 path: &[std::borrow::Cow<'static, str>],
1550) -> Option<RowValue> {
1551 if path.is_empty() {
1552 return None;
1553 }
1554 let mut cur = row.get(path[0].as_ref())?;
1555 for seg in path.iter().skip(1) {
1556 cur = match cur {
1557 RowValue::Object(m) => m.get(seg.as_ref())?,
1558 RowValue::None => return None,
1559 _ => return None,
1560 };
1561 }
1562 Some(cur.clone())
1563}
1564
1565fn row_value_nested_object_path(
1567 segments: &[std::borrow::Cow<'static, str>],
1568 leaf: RowValue,
1569) -> RowValue {
1570 debug_assert!(!segments.is_empty());
1571 if segments.len() == 1 {
1572 let mut m = BTreeMap::new();
1573 m.insert(segments[0].to_string(), leaf);
1574 RowValue::Object(m)
1575 } else {
1576 let mut m = BTreeMap::new();
1577 m.insert(
1578 segments[0].to_string(),
1579 row_value_nested_object_path(&segments[1..], leaf),
1580 );
1581 RowValue::Object(m)
1582 }
1583}
1584
1585fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
1586 match (&mut *into, from) {
1587 (RowValue::Object(m1), RowValue::Object(m2)) => {
1588 for (k, v2) in m2 {
1589 match m1.entry(k) {
1590 std::collections::btree_map::Entry::Vacant(e) => {
1591 e.insert(v2);
1592 }
1593 std::collections::btree_map::Entry::Occupied(mut e) => {
1594 merge_row_value_trees(e.get_mut(), v2);
1595 }
1596 }
1597 }
1598 }
1599 (slot, from) => *slot = from,
1600 }
1601}
1602
1603fn project_row<T: crate::schema::DbModel>(
1604 row: BTreeMap<String, RowValue>,
1605) -> BTreeMap<String, RowValue> {
1606 row_subset_by_field_defs(&row, &T::fields())
1607}
1608
1609pub(crate) fn scalar_at_path(
1610 row: &BTreeMap<String, RowValue>,
1611 path: &crate::schema::FieldPath,
1612) -> Option<ScalarValue> {
1613 let mut cur: Option<&RowValue> = None;
1614 for (i, seg) in path.0.iter().enumerate() {
1615 let key = seg.as_ref();
1616 cur = match (i, cur) {
1617 (0, _) => row.get(key),
1618 (_, Some(RowValue::Object(map))) => map.get(key),
1619 (_, Some(RowValue::None)) => return None,
1620 _ => return None,
1621 };
1622 }
1623 cur.and_then(|v| v.as_scalar())
1624}
1625
1626impl Database<FileStore> {
1627 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
1631 Self::open_with_options(path, crate::config::OpenOptions::default())
1632 }
1633
1634 pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
1636 Self::open_with_options(
1637 path,
1638 crate::config::OpenOptions {
1639 recovery: crate::config::RecoveryMode::Strict,
1640 mode: OpenMode::ReadOnly,
1641 },
1642 )
1643 }
1644
1645 pub fn open_with_options(
1647 path: impl AsRef<Path>,
1648 opts: crate::config::OpenOptions,
1649 ) -> Result<Self, DbError> {
1650 let path = path.as_ref().to_path_buf();
1651 let store = FileStore::open_locked(&path, opts.mode)?;
1652 let mut db = Self::open_with_store(path.clone(), store, opts)?;
1653 if opts.mode == OpenMode::ReadWrite {
1654 db.writer_registry = Some(writer_registry::WriterRegistryGuard::new(path)?);
1655 }
1656 Ok(db)
1657 }
1658}
1659
1660impl Database<VecStore> {
1661 pub fn open_in_memory() -> Result<Self, DbError> {
1663 Self::open_in_memory_with_options(crate::config::OpenOptions::default())
1664 }
1665
1666 pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
1668 Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
1669 }
1670
1671 pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
1673 Self::open_with_store(
1674 PathBuf::from(":memory:"),
1675 VecStore::from_vec(bytes),
1676 crate::config::OpenOptions::default(),
1677 )
1678 }
1679
1680 pub fn into_snapshot_bytes(self) -> Vec<u8> {
1682 self.store.into_inner()
1683 }
1684
1685 pub fn snapshot_bytes(&self) -> Vec<u8> {
1687 self.store.as_slice().to_vec()
1688 }
1689
1690 pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1692 Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
1693 }
1694
1695 pub(crate) fn export_snapshot_to_path_with_fsops(
1696 fs: &dyn FsOps,
1697 dest_path: impl AsRef<Path>,
1698 bytes: &[u8],
1699 ) -> Result<(), DbError> {
1700 fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
1701 Ok(())
1702 }
1703
1704 pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
1706 let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
1707 Self::from_snapshot_bytes(bytes)
1708 }
1709}
1710
1711#[cfg(test)]
1712mod scalar_at_path_tests {
1713 include!(concat!(
1714 env!("CARGO_MANIFEST_DIR"),
1715 "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
1716 ));
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721 include!(concat!(
1722 env!("CARGO_MANIFEST_DIR"),
1723 "/tests/unit/src_db_mod_tests.rs"
1724 ));
1725}