Skip to main content

modelvault_core/db/
mod.rs

1//! Database handle and orchestration.
2//!
3//! [`Database`] is implemented using internal modules `open` (bootstrap), `replay` (catalog and
4//! rows from segments), `write` (append segments and publish), and `helpers` (name rules).
5
6mod fs_ops;
7mod helpers;
8mod open;
9mod recover;
10mod replay;
11pub(crate) mod row_materialize;
12pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
13mod row_merge;
14pub(crate) mod row_paths;
15pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
16mod write;
17
18use std::collections::{BTreeMap, HashMap};
19use std::marker::PhantomData;
20use std::path::{Path, PathBuf};
21
22use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
23use crate::config::{OpenMode, OpenOptions};
24use crate::error::{DbError, FormatError, SchemaError, TransactionError};
25use crate::index::IndexState;
26use crate::index::{encode_index_payload, IndexEntry, IndexOp};
27use crate::record::{
28    encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
29    encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
30    OP_REPLACE,
31};
32use crate::schema::{classify_schema_update, SchemaChange};
33use crate::schema::{CollectionId, FieldDef, SchemaVersion};
34use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
35use crate::segments::writer::SegmentWriter;
36use crate::storage::{FileStore, Store, VecStore};
37use crate::validation;
38use crate::{checkpoint, publish};
39use crate::{MigrationPlan, MigrationStep};
40
41use self::fs_ops::{FsOps, StdFsOps};
42
43/// Best-effort `fsync` on `dest_path`'s parent directory (Unix only).
44#[cfg(unix)]
45fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
46    let Some(parent) = dest_path.parent() else {
47        return;
48    };
49    let Ok(dir_f) = fs.open_dir(parent) else {
50        return;
51    };
52    let _ = dir_f.sync_all();
53}
54
55pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
56
57type PlannedInsert = (
58    Vec<u8>,
59    (Vec<u8>, BTreeMap<String, RowValue>),
60    Vec<IndexEntry>,
61    ScalarValue,
62);
63
64fn plan_insert_row(
65    catalog: &Catalog,
66    collection_id: CollectionId,
67    mut row: BTreeMap<String, RowValue>,
68) -> Result<PlannedInsert, DbError> {
69    let col =
70        catalog
71            .get(collection_id)
72            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
73                id: collection_id.0,
74            }))?;
75    let pk_name =
76        col.primary_field
77            .as_deref()
78            .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
79                collection_id: collection_id.0,
80            }))?;
81    let pk_def = col
82        .fields
83        .iter()
84        .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
85        .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
86            name: pk_name.to_string(),
87        }))?;
88    let pk_ty = &pk_def.ty;
89    validation::ensure_pk_type_primitive(pk_ty)?;
90    let mut pk_path = vec![pk_name.to_string()];
91    let pk_cell = row
92        .get(pk_name)
93        .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
94            name: pk_name.to_string(),
95        }))?;
96    validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
97    // Validate unknown fields: for nested schema paths we validate by traversing row objects.
98    // For legacy single-segment schemas, keep the existing top-level validation.
99    let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
100    if !has_multi_segment_schema {
101        validation::validate_top_level_row(&col.fields, pk_name, &row)?;
102    } else {
103        validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
104    }
105
106    // `pk_cell` is already present (validated above), so remove must succeed.
107    let pk_val = row.remove(pk_name).unwrap();
108    // PK type and value were validated as a primitive scalar.
109    let pk_scalar = pk_val
110        .clone()
111        .into_scalar()
112        .expect("validated primary key must be scalar");
113
114    // Build non-PK values in schema order.
115    // - legacy v2: single-segment top-level field defs
116    // - v3: full FieldPath for each non-PK def (multi-segment allowed)
117    let non_pk_defs = if has_multi_segment_schema {
118        col.fields
119            .iter()
120            .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
121            .collect::<Vec<_>>()
122    } else {
123        non_pk_defs_in_order(&col.fields, pk_name)
124    };
125    let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
126
127    let payload = if has_multi_segment_schema {
128        encode_record_payload_v3(
129            collection_id.0,
130            col.current_version.0,
131            &pk_scalar,
132            pk_ty,
133            &non_pk,
134        )
135        .expect("record payload encoding must succeed after validation")
136    } else {
137        encode_record_payload_v2(
138            collection_id.0,
139            col.current_version.0,
140            &pk_scalar,
141            pk_ty,
142            &non_pk,
143        )
144        .expect("record payload encoding must succeed after validation")
145    };
146
147    let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
148    full_map.insert(pk_name.to_string(), pk_val);
149    for (def, v) in &non_pk {
150        let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
151        if parts.len() == 1 {
152            full_map.insert(parts[0].clone(), v.clone());
153        } else {
154            debug_assert!(parts.len() >= 2);
155            row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
156        }
157    }
158    let mut index_entries: Vec<IndexEntry> = Vec::new();
159    for idx in &col.indexes {
160        let Some(v) = scalar_at_path(&full_map, &idx.path) else {
161            continue;
162        };
163        index_entries.push(IndexEntry {
164            collection_id: collection_id.0,
165            index_name: idx.name.clone(),
166            kind: idx.kind,
167            op: IndexOp::Insert,
168            index_key: v.canonical_key_bytes(),
169            pk_key: pk_scalar.canonical_key_bytes(),
170        });
171    }
172    let pk_key = pk_scalar.canonical_key_bytes();
173    Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
174}
175
176fn index_deletes_for_existing_row(
177    collection_id: CollectionId,
178    pk_scalar: &ScalarValue,
179    indexes: &[crate::schema::IndexDef],
180    existing_row: &BTreeMap<String, RowValue>,
181) -> Vec<IndexEntry> {
182    let mut out = Vec::new();
183    for idx in indexes {
184        let Some(v) = scalar_at_path(existing_row, &idx.path) else {
185            continue;
186        };
187        out.push(IndexEntry {
188            collection_id: collection_id.0,
189            index_name: idx.name.clone(),
190            kind: idx.kind,
191            op: IndexOp::Delete,
192            index_key: v.canonical_key_bytes(),
193            pk_key: pk_scalar.canonical_key_bytes(),
194        });
195    }
196    out
197}
198
199/// Staged writes while [`Database::transaction`] is executing.
200pub(crate) struct TxnStaging {
201    pub(crate) txn_id: u64,
202    pub(crate) shadow_catalog: Catalog,
203    pub(crate) shadow_latest: LatestMap,
204    pub(crate) shadow_indexes: IndexState,
205    pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
206}
207
208/// Opened ModelVault database: generic over a [`Store`] ([`FileStore`] on disk, [`VecStore`] in memory).
209pub struct Database<S: Store = FileStore> {
210    /// Path shown by [`Database::path`] (`":memory:"` for [`VecStore`]).
211    path: PathBuf,
212    store: S,
213    /// In-memory view of schema segments replayed from disk.
214    catalog: Catalog,
215    /// Byte offset where the append-only segment log begins (after header and superblocks).
216    segment_start: u64,
217    /// Format minor from the file header; may be lazily upgraded (`3` → `4` → `5`) on write.
218    format_minor: u16,
219    /// Latest row per `(collection_id, canonical primary-key bytes)`; last replayed insert wins.
220    latest: LatestMap,
221    /// Secondary indexes rebuilt from replayed `Index` segments.
222    indexes: IndexState,
223    /// Monotonic id for transaction marker segments (format minor 6+).
224    txn_seq: u64,
225    /// When set, [`insert`] / [`register_collection`] append to this batch instead of autocommit.
226    txn_staging: Option<TxnStaging>,
227    /// Covers replace-path record encoding error branches in tests (misaligned validated row maps).
228    #[cfg(test)]
229    #[doc(hidden)]
230    #[allow(clippy::type_complexity)]
231    pub(crate) test_poison_planned_replace_row:
232        Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
233    /// Covers delete Opcode payload encoding `?` by supplying a bogus scalar unrelated to validated `pk`.
234    #[cfg(test)]
235    #[doc(hidden)]
236    pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
237}
238
239impl<S: Store> Database<S> {
240    fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
241        let mut out = Database::<VecStore>::open_in_memory()?;
242
243        // Recreate catalog (stable ids if created in id order).
244        let mut cols = self.catalog_for_read().collections();
245        cols.sort_by_key(|c| c.id.0);
246        for c in &cols {
247            let pk =
248                c.primary_field
249                    .as_deref()
250                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
251                        collection_id: c.id.0,
252                    }))?;
253            let (new_id, _v1) = out.register_collection_with_indexes(
254                &c.name,
255                c.fields.clone(),
256                c.indexes.clone(),
257                pk,
258            )?;
259            // Bump schema version counter to match current_version (repeat identical schema).
260            for _ in 2..=c.current_version.0 {
261                let _ = out.register_schema_version_with_indexes_force(
262                    new_id,
263                    c.fields.clone(),
264                    c.indexes.clone(),
265                )?;
266            }
267        }
268
269        // Copy latest rows (in-memory snapshot semantics).
270        for ((cid, _), row) in self.latest_for_read().iter() {
271            let collection_id = CollectionId(*cid);
272            out.insert(collection_id, row.clone())?;
273        }
274
275        Ok(out.into_snapshot_bytes())
276    }
277
278    pub(crate) fn open_with_store(
279        path: PathBuf,
280        store: S,
281        opts: OpenOptions,
282    ) -> Result<Self, DbError> {
283        open::open_with_store(path, store, opts)
284    }
285
286    fn next_txn_id(&mut self) -> u64 {
287        self.txn_seq = self.txn_seq.saturating_add(1);
288        self.txn_seq
289    }
290
291    #[inline]
292    fn commit_write_batch(
293        &mut self,
294        txn_id: u64,
295        body: &[(crate::segments::header::SegmentType, &[u8])],
296    ) -> Result<(), DbError> {
297        write::commit_write_txn_v6(
298            &mut self.store,
299            self.segment_start,
300            &mut self.format_minor,
301            txn_id,
302            body,
303        )
304    }
305
306    #[inline]
307    fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
308        self.catalog.apply_record(wire)
309    }
310
311    /// Run `f` inside a multi-write transaction: durable segments are written on success.
312    ///
313    /// On error, staged work is discarded and nothing new is appended to the log.
314    pub fn transaction<R>(
315        &mut self,
316        f: impl FnOnce(&mut Self) -> Result<R, DbError>,
317    ) -> Result<R, DbError> {
318        self.begin_transaction()?;
319        match f(self) {
320            Ok(v) => {
321                self.commit_transaction()?;
322                Ok(v)
323            }
324            Err(e) => {
325                self.rollback_transaction();
326                Err(e)
327            }
328        }
329    }
330
331    /// Start a transaction (for bindings that cannot use the closure API). Pairs with
332    /// [`Self::commit_transaction`] or [`Self::rollback_transaction`].
333    pub fn begin_transaction(&mut self) -> Result<(), DbError> {
334        if self.txn_staging.is_some() {
335            return Err(DbError::Transaction(TransactionError::NestedTransaction));
336        }
337        let tid = self.next_txn_id();
338        self.txn_staging = Some(TxnStaging {
339            txn_id: tid,
340            shadow_catalog: self.catalog.clone(),
341            shadow_latest: self.latest.clone(),
342            shadow_indexes: self.indexes.clone(),
343            pending: Vec::new(),
344        });
345        Ok(())
346    }
347
348    /// Commit the active transaction started with [`Self::begin_transaction`].
349    pub fn commit_transaction(&mut self) -> Result<(), DbError> {
350        self.commit_txn_staging()
351    }
352
353    /// Discard the active transaction without writing to the log.
354    pub fn rollback_transaction(&mut self) {
355        self.txn_staging = None;
356    }
357
358    fn commit_txn_staging(&mut self) -> Result<(), DbError> {
359        let Some(st) = self.txn_staging.take() else {
360            return Ok(());
361        };
362        if st.pending.is_empty() {
363            self.catalog = st.shadow_catalog;
364            self.latest = st.shadow_latest;
365            self.indexes = st.shadow_indexes;
366            return Ok(());
367        }
368        let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
369            st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
370        self.commit_write_batch(st.txn_id, &batch)?;
371        self.catalog = st.shadow_catalog;
372        self.latest = st.shadow_latest;
373        self.indexes = st.shadow_indexes;
374        Ok(())
375    }
376
377    fn catalog_for_read(&self) -> &Catalog {
378        if let Some(ref st) = self.txn_staging {
379            &st.shadow_catalog
380        } else {
381            &self.catalog
382        }
383    }
384
385    fn indexes_for_read(&self) -> &IndexState {
386        if let Some(ref st) = self.txn_staging {
387            &st.shadow_indexes
388        } else {
389            &self.indexes
390        }
391    }
392
393    fn latest_for_read(&self) -> &LatestMap {
394        if let Some(ref st) = self.txn_staging {
395            &st.shadow_latest
396        } else {
397            &self.latest
398        }
399    }
400
401    /// Path passed to [`Database::open`](Database::<FileStore>::open), or `":memory:"` for [`VecStore`].
402    pub fn path(&self) -> &Path {
403        &self.path
404    }
405
406    /// Read-only view of the schema catalog built from `Schema` segments.
407    pub fn catalog(&self) -> &Catalog {
408        self.catalog_for_read()
409    }
410
411    /// All registered collection names in lexicographic order.
412    pub fn collection_names(&self) -> Vec<String> {
413        self.catalog_for_read().collection_names()
414    }
415
416    /// Read-only access to the in-memory secondary index state (rebuilt from `Index` segments).
417    pub fn index_state(&self) -> &IndexState {
418        self.indexes_for_read()
419    }
420
421    /// Execute a query against the current in-memory snapshot of the database.
422    pub fn query(
423        &self,
424        q: &crate::query::Query,
425    ) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
426        crate::query::execute_query(
427            self.catalog_for_read(),
428            self.indexes_for_read(),
429            self.latest_for_read(),
430            q,
431        )
432    }
433
434    /// Return a human-readable explanation of the chosen plan for `q`.
435    pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
436        crate::query::explain_query(self.catalog_for_read(), q)
437    }
438
439    /// Lazy iterator over query rows (same semantics as [`Self::query`]).
440    ///
441    /// See [`crate::query::QueryRowIter`] — this is the v0.7 pull-based execution boundary, not a
442    /// full operator graph.
443    pub fn query_iter(
444        &self,
445        q: &crate::query::Query,
446    ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
447        crate::query::execute_query_iter_with_spill_path(
448            self.catalog_for_read(),
449            self.indexes_for_read(),
450            self.latest_for_read(),
451            q,
452            Some(self.path.as_path()),
453        )
454    }
455
456    /// Register the collection schema defined by `T` (schema version 1).
457    pub fn register_model<T: crate::schema::DbModel>(
458        &mut self,
459    ) -> Result<(CollectionId, SchemaVersion), DbError> {
460        self.register_collection_with_indexes(
461            T::collection_name(),
462            T::fields(),
463            T::indexes(),
464            T::primary_field(),
465        )
466    }
467
468    /// Typed handle over a registered collection; `T` may be a *subset model*.
469    pub fn collection<'a, T: crate::schema::DbModel>(
470        &'a self,
471    ) -> Result<Collection<'a, S, T>, DbError> {
472        let cid = self.collection_id_named(T::collection_name())?;
473        let col = self
474            .catalog_for_read()
475            .get(cid)
476            .expect("collection id from name lookup must exist in catalog");
477        validate_subset_model::<T>(col)?;
478        Ok(Collection {
479            db: self,
480            collection_id: cid,
481            _marker: PhantomData,
482        })
483    }
484
485    /// Look up [`CollectionId`] by collection name (leading/trailing whitespace trimmed).
486    ///
487    /// Returns [`SchemaError::UnknownCollectionName`] when the name is not registered.
488    pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
489        self.catalog_for_read()
490            .lookup_name(name)
491            .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
492                name: name.trim().to_string(),
493            }))
494    }
495
496    /// Create a new collection at schema version `1`.
497    ///
498    /// `primary_field` must name a **single-segment** (top-level) field present in `fields`.
499    /// Appends a catalog segment and updates the in-memory catalog.
500    pub fn register_collection(
501        &mut self,
502        name: &str,
503        fields: Vec<FieldDef>,
504        primary_field: &str,
505    ) -> Result<(CollectionId, SchemaVersion), DbError> {
506        self.register_collection_with_indexes(name, fields, vec![], primary_field)
507    }
508
509    pub fn register_collection_with_indexes(
510        &mut self,
511        name: &str,
512        fields: Vec<FieldDef>,
513        indexes: Vec<crate::schema::IndexDef>,
514        primary_field: &str,
515    ) -> Result<(CollectionId, SchemaVersion), DbError> {
516        let name = helpers::normalize_collection_name(name)?;
517        let pk = primary_field.trim();
518        if pk.is_empty() {
519            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
520        }
521        if !Catalog::has_top_level_field(&fields, pk) {
522            return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
523                name: pk.to_string(),
524            }));
525        }
526        if let Some(st) = &mut self.txn_staging {
527            let id = st.shadow_catalog.next_collection_id().0;
528            let wire = CatalogRecordWire::CreateCollection {
529                collection_id: id,
530                name: name.clone(),
531                schema_version: 1,
532                fields,
533                indexes,
534                primary_field: Some(pk.to_string()),
535            };
536            let payload = encode_catalog_payload(&wire);
537            st.shadow_catalog.apply_record(wire)?;
538            st.pending
539                .push((crate::segments::header::SegmentType::Schema, payload));
540            return Ok((CollectionId(id), SchemaVersion(1)));
541        }
542        let id = self.catalog.next_collection_id().0;
543        let wire = CatalogRecordWire::CreateCollection {
544            collection_id: id,
545            name: name.clone(),
546            schema_version: 1,
547            fields,
548            indexes,
549            primary_field: Some(pk.to_string()),
550        };
551        let payload = encode_catalog_payload(&wire);
552        let tid = self.next_txn_id();
553        self.commit_write_batch(
554            tid,
555            &[(
556                crate::segments::header::SegmentType::Schema,
557                payload.as_slice(),
558            )],
559        )?;
560        self.apply_catalog_record(wire)?;
561        Ok((CollectionId(id), SchemaVersion(1)))
562    }
563
564    /// Bump the schema version for `id` to `current + 1` with a new field set.
565    ///
566    /// The primary-key field must remain present as a top-level field (see catalog rules).
567    pub fn register_schema_version(
568        &mut self,
569        id: CollectionId,
570        fields: Vec<FieldDef>,
571    ) -> Result<SchemaVersion, DbError> {
572        self.register_schema_version_with_indexes(id, fields, vec![])
573    }
574
575    pub fn register_schema_version_with_indexes(
576        &mut self,
577        id: CollectionId,
578        fields: Vec<FieldDef>,
579        indexes: Vec<crate::schema::IndexDef>,
580    ) -> Result<SchemaVersion, DbError> {
581        let current = self
582            .catalog_for_read()
583            .get(id)
584            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
585        // `classify_schema_update` only returns `Ok(...)` variants today; keep it infallible here.
586        match classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)? {
587            SchemaChange::Safe => {}
588            SchemaChange::NeedsMigration { reason, .. } => {
589                return Err(DbError::Schema(SchemaError::MigrationRequired {
590                    message: reason,
591                }));
592            }
593            SchemaChange::Breaking { reason } => {
594                return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
595                    message: reason,
596                }));
597            }
598        }
599        let next_v = current
600            .current_version
601            .0
602            .checked_add(1)
603            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
604        let wire = CatalogRecordWire::NewSchemaVersion {
605            collection_id: id.0,
606            schema_version: next_v,
607            fields,
608            indexes,
609        };
610        let payload = encode_catalog_payload(&wire);
611        if let Some(st) = &mut self.txn_staging {
612            st.shadow_catalog.apply_record(wire.clone())?;
613            st.pending
614                .push((crate::segments::header::SegmentType::Schema, payload));
615            return Ok(SchemaVersion(next_v));
616        }
617        let tid = self.next_txn_id();
618        self.commit_write_batch(
619            tid,
620            &[(
621                crate::segments::header::SegmentType::Schema,
622                payload.as_slice(),
623            )],
624        )?;
625        self.apply_catalog_record(wire)?;
626        Ok(SchemaVersion(next_v))
627    }
628
629    /// Plan a schema version bump and return the required migration steps, if any.
630    pub fn plan_schema_version_with_indexes(
631        &self,
632        id: CollectionId,
633        fields: Vec<FieldDef>,
634        indexes: Vec<crate::schema::IndexDef>,
635    ) -> Result<MigrationPlan, DbError> {
636        let current = self
637            .catalog_for_read()
638            .get(id)
639            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
640        // Same infallibility contract as `register_schema_version_with_indexes` above.
641        let change = classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)?;
642        let mut steps = Vec::new();
643        match &change {
644            SchemaChange::Safe => {}
645            SchemaChange::Breaking { .. } => {}
646            SchemaChange::NeedsMigration {
647                reason,
648                backfill_top_level_field,
649                backfill_field_path,
650            } => {
651                if let Some(field) = backfill_top_level_field {
652                    steps.push(MigrationStep::BackfillTopLevelField {
653                        field: field.clone(),
654                    });
655                } else if let Some(path) = backfill_field_path {
656                    steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
657                } else if reason.contains("unique index") {
658                    steps.push(MigrationStep::RebuildIndexes);
659                }
660            }
661        }
662        Ok(MigrationPlan { change, steps })
663    }
664
665    /// Backfill a missing top-level field with a fixed value for all rows in a collection.
666    ///
667    /// This helper is intentionally simple so it can be bound to other languages.
668    pub fn backfill_top_level_field_with_value(
669        &mut self,
670        collection_id: CollectionId,
671        field: &str,
672        value: RowValue,
673    ) -> Result<(), DbError> {
674        let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
675        self.backfill_field_at_path_with_value(collection_id, &path, value)
676    }
677
678    /// Backfill a missing field (any segment path) with a fixed value for all rows.
679    pub fn backfill_field_at_path_with_value(
680        &mut self,
681        collection_id: CollectionId,
682        path: &crate::schema::FieldPath,
683        value: RowValue,
684    ) -> Result<(), DbError> {
685        let col = self
686            .catalog_for_read()
687            .get(collection_id)
688            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
689                id: collection_id.0,
690            }))?;
691        let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
692            DbError::Schema(SchemaError::RowUnknownField {
693                name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
694            })
695        })?;
696
697        let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
698        for ((cid, _), row) in self.latest_for_read().iter() {
699            if *cid != collection_id.0 {
700                continue;
701            }
702            rows.push(row.clone());
703        }
704
705        self.transaction(|db| {
706            for mut row in rows {
707                if row_value_at_path_segments(&row, &path.0).is_some() {
708                    continue;
709                }
710                crate::record::insert_value_at_path(&mut row, path, value.clone())?;
711                db.insert(collection_id, row)?;
712            }
713            Ok(())
714        })
715    }
716
717    /// Rebuild index entries for all rows in `collection_id` using the current schema’s index defs.
718    pub fn rebuild_indexes_for_collection(
719        &mut self,
720        collection_id: CollectionId,
721    ) -> Result<(), DbError> {
722        let col = self
723            .catalog_for_read()
724            .get(collection_id)
725            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
726                id: collection_id.0,
727            }))?;
728        let pk_name =
729            col.primary_field
730                .as_deref()
731                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
732                    collection_id: collection_id.0,
733                }))?;
734        let pk_def = col
735            .fields
736            .iter()
737            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
738            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
739                name: pk_name.to_string(),
740            }))?;
741
742        let mut entries: Vec<IndexEntry> = Vec::new();
743        for ((cid, _), row) in self.latest_for_read().iter() {
744            if *cid != collection_id.0 {
745                continue;
746            }
747            let Some(pk_cell) = row.get(pk_name) else {
748                continue;
749            };
750            let pk_scalar = pk_cell.clone().into_scalar()?;
751            if !pk_scalar.ty_matches(&pk_def.ty) {
752                continue;
753            }
754            for idx in &col.indexes {
755                let Some(v) = scalar_at_path(row, &idx.path) else {
756                    continue;
757                };
758                entries.push(IndexEntry {
759                    collection_id: collection_id.0,
760                    index_name: idx.name.clone(),
761                    kind: idx.kind,
762                    op: IndexOp::Insert,
763                    index_key: v.canonical_key_bytes(),
764                    pk_key: pk_scalar.canonical_key_bytes(),
765                });
766            }
767        }
768
769        self.transaction(|db| {
770            if entries.is_empty() {
771                return Ok(());
772            }
773            // Apply in-memory + persist as one index segment batch.
774            // `begin_transaction` always installs `txn_staging` before this closure runs.
775            let st = db
776                .txn_staging
777                .as_mut()
778                .expect("transaction staging must be active");
779            let b = encode_index_payload(&entries);
780            st.pending
781                .push((crate::segments::header::SegmentType::Index, b));
782            for e in entries {
783                st.shadow_indexes.apply(e)?;
784            }
785            Ok(())
786        })
787    }
788
789    /// Force-register a new schema version, bypassing compatibility checks.
790    ///
791    /// This is an escape hatch for advanced workflows where the caller performs an out-of-band
792    /// data rewrite (or accepts inconsistent index/query behavior until a rebuild).
793    pub fn register_schema_version_with_indexes_force(
794        &mut self,
795        id: CollectionId,
796        fields: Vec<FieldDef>,
797        indexes: Vec<crate::schema::IndexDef>,
798    ) -> Result<SchemaVersion, DbError> {
799        let current = self
800            .catalog_for_read()
801            .get(id)
802            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
803        let next_v = current
804            .current_version
805            .0
806            .checked_add(1)
807            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
808        let wire = CatalogRecordWire::NewSchemaVersion {
809            collection_id: id.0,
810            schema_version: next_v,
811            fields,
812            indexes,
813        };
814        let payload = encode_catalog_payload(&wire);
815        if let Some(st) = &mut self.txn_staging {
816            st.shadow_catalog.apply_record(wire.clone())?;
817            st.pending
818                .push((crate::segments::header::SegmentType::Schema, payload));
819            return Ok(SchemaVersion(next_v));
820        }
821        let tid = self.next_txn_id();
822        self.commit_write_batch(
823            tid,
824            &[(
825                crate::segments::header::SegmentType::Schema,
826                payload.as_slice(),
827            )],
828        )?;
829        self.apply_catalog_record(wire)?;
830        Ok(SchemaVersion(next_v))
831    }
832
833    /// Insert or replace the row for `collection_id` identified by its primary-key field.
834    ///
835    /// `row` maps **top-level** field names to [`RowValue`]. The primary key field must be present.
836    /// Only single-segment field paths are supported in 0.6.x.
837    pub fn insert(
838        &mut self,
839        collection_id: CollectionId,
840        row: BTreeMap<String, RowValue>,
841    ) -> Result<(), DbError> {
842        write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
843        let (mut payload, full, mut index_entries, pk_scalar) =
844            plan_insert_row(self.catalog_for_read(), collection_id, row)?;
845        #[cfg(test)]
846        let mut full = full;
847        let existing = self
848            .latest_for_read()
849            .get(&(collection_id.0, full.0.clone()))
850            .cloned();
851        if existing.is_some() {
852            #[cfg(test)]
853            if let Some(poison) = self.test_poison_planned_replace_row.take() {
854                poison(collection_id, &mut full.1);
855            }
856            // Re-encode with explicit replace opcode.
857            let col = self
858                .catalog_for_read()
859                .get(collection_id)
860                .ok_or(DbError::Schema(SchemaError::UnknownCollection {
861                    id: collection_id.0,
862                }))?;
863            let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
864            let pk_name =
865                col.primary_field
866                    .as_deref()
867                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
868                        collection_id: collection_id.0,
869                    }))?;
870            let pk_def = col
871                .fields
872                .iter()
873                .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
874                .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
875                    name: pk_name.to_string(),
876                }))?;
877
878            let non_pk_defs = if has_multi_segment_schema {
879                col.fields
880                    .iter()
881                    .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
882                    .collect::<Vec<_>>()
883            } else {
884                non_pk_defs_in_order(&col.fields, pk_name)
885            };
886            let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
887            for def in &non_pk_defs {
888                let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
889                non_pk.push(((*def).clone(), v));
890            }
891            payload = (if has_multi_segment_schema {
892                encode_record_payload_v3_op(
893                    collection_id.0,
894                    col.current_version.0,
895                    OP_REPLACE,
896                    &pk_scalar,
897                    &pk_def.ty,
898                    &non_pk,
899                )
900            } else {
901                encode_record_payload_v2_op(
902                    collection_id.0,
903                    col.current_version.0,
904                    OP_REPLACE,
905                    &pk_scalar,
906                    &pk_def.ty,
907                    &non_pk,
908                )
909            })?;
910            // Prepend index deletes for any existing row.
911            if let Some(ref old_row) = existing {
912                let mut deletes = index_deletes_for_existing_row(
913                    collection_id,
914                    &pk_scalar,
915                    &col.indexes,
916                    old_row,
917                );
918                deletes.append(&mut index_entries);
919                index_entries = deletes;
920            }
921        }
922        for e in &index_entries {
923            if e.kind != crate::schema::IndexKind::Unique {
924                continue;
925            }
926            let Some(existing) =
927                self.indexes_for_read()
928                    .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
929            else {
930                continue;
931            };
932            if e.op != IndexOp::Insert {
933                continue;
934            }
935            if existing == e.pk_key.as_slice() {
936                continue;
937            }
938            return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
939        }
940        if let Some(st) = &mut self.txn_staging {
941            if !index_entries.is_empty() {
942                let b = encode_index_payload(&index_entries);
943                st.pending
944                    .push((crate::segments::header::SegmentType::Index, b));
945            }
946            st.pending.push((
947                crate::segments::header::SegmentType::Record,
948                payload.clone(),
949            ));
950            st.shadow_latest
951                .insert((collection_id.0, full.0.clone()), full.1.clone());
952            for e in index_entries {
953                st.shadow_indexes.apply(e)?;
954            }
955            return Ok(());
956        }
957        let tid = self.next_txn_id();
958        let index_bytes = if index_entries.is_empty() {
959            None
960        } else {
961            Some(encode_index_payload(&index_entries))
962        };
963        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
964        if let Some(ref b) = index_bytes {
965            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
966        }
967        batch.push((
968            crate::segments::header::SegmentType::Record,
969            payload.as_slice(),
970        ));
971        self.commit_write_batch(tid, &batch)?;
972        self.latest.insert((collection_id.0, full.0), full.1);
973        for e in index_entries {
974            self.indexes.apply(e)?;
975        }
976        Ok(())
977    }
978
979    /// Delete the row for `collection_id` identified by its primary key.
980    pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
981        write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
982        let col = self
983            .catalog_for_read()
984            .get(collection_id)
985            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
986                id: collection_id.0,
987            }))?;
988        let pk_name =
989            col.primary_field
990                .as_deref()
991                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
992                    collection_id: collection_id.0,
993                }))?;
994        let pk_def = col
995            .fields
996            .iter()
997            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
998            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
999                name: pk_name.to_string(),
1000            }))?;
1001        if !pk.ty_matches(&pk_def.ty) {
1002            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1003        }
1004        let pk_key = pk.canonical_key_bytes();
1005        let existing = self
1006            .latest_for_read()
1007            .get(&(collection_id.0, pk_key.clone()))
1008            .cloned();
1009        let Some(old_row) = existing else {
1010            return Ok(());
1011        };
1012        let indexes = col.indexes.clone();
1013        let schema_ver = col.current_version.0;
1014        let pk_ty = pk_def.ty.clone();
1015        let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
1016
1017        let mut index_entries =
1018            index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
1019        #[cfg(not(test))]
1020        let pk_for_record = pk.clone();
1021        #[cfg(test)]
1022        let pk_for_record = {
1023            let mut p = pk.clone();
1024            if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
1025                p = poison(p);
1026            }
1027            p
1028        };
1029        let record_payload = (if has_multi_segment_schema {
1030            encode_record_payload_v3_op(
1031                collection_id.0,
1032                schema_ver,
1033                OP_DELETE,
1034                &pk_for_record,
1035                &pk_ty,
1036                &[],
1037            )
1038        } else {
1039            encode_record_payload_v2_op(
1040                collection_id.0,
1041                schema_ver,
1042                OP_DELETE,
1043                &pk_for_record,
1044                &pk_ty,
1045                &[],
1046            )
1047        })?;
1048
1049        if let Some(st) = &mut self.txn_staging {
1050            if !index_entries.is_empty() {
1051                let b = encode_index_payload(&index_entries);
1052                st.pending
1053                    .push((crate::segments::header::SegmentType::Index, b));
1054            }
1055            st.pending.push((
1056                crate::segments::header::SegmentType::Record,
1057                record_payload.clone(),
1058            ));
1059            st.shadow_latest.remove(&(collection_id.0, pk_key));
1060            for e in index_entries.drain(..) {
1061                st.shadow_indexes.apply(e)?;
1062            }
1063            return Ok(());
1064        }
1065
1066        let tid = self.next_txn_id();
1067        let index_bytes = if index_entries.is_empty() {
1068            None
1069        } else {
1070            Some(encode_index_payload(&index_entries))
1071        };
1072        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
1073        if let Some(ref b) = index_bytes {
1074            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
1075        }
1076        batch.push((
1077            crate::segments::header::SegmentType::Record,
1078            record_payload.as_slice(),
1079        ));
1080        self.commit_write_batch(tid, &batch)?;
1081        self.latest.remove(&(collection_id.0, pk_key));
1082        for e in index_entries {
1083            self.indexes.apply(e)?;
1084        }
1085        Ok(())
1086    }
1087
1088    /// Return the latest stored row for `pk`, or `None` if no insert has been replayed for that key.
1089    ///
1090    /// `pk` must match the declared primary field’s [`crate::schema::Type`].
1091    pub fn get(
1092        &self,
1093        collection_id: CollectionId,
1094        pk: &ScalarValue,
1095    ) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
1096        let col = self
1097            .catalog_for_read()
1098            .get(collection_id)
1099            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1100                id: collection_id.0,
1101            }))?;
1102        let pk_name =
1103            col.primary_field
1104                .as_deref()
1105                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
1106                    collection_id: collection_id.0,
1107                }))?;
1108        let pk_ty = col
1109            .fields
1110            .iter()
1111            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
1112            .map(|f| &f.ty)
1113            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1114                name: pk_name.to_string(),
1115            }))?;
1116        if !pk.ty_matches(pk_ty) {
1117            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1118        }
1119        let key = (collection_id.0, pk.canonical_key_bytes());
1120        Ok(self.latest_for_read().get(&key).cloned())
1121    }
1122
1123    /// Write a durable checkpoint segment and publish it via the superblock.
1124    ///
1125    /// The checkpoint stores the logical state (catalog + latest rows + index state) so open can
1126    /// avoid scanning/replaying the full log. Works with any [`Store`] (file-backed [`FileStore`] or
1127    /// [`VecStore`] snapshots).
1128    pub fn checkpoint(&mut self) -> Result<(), DbError> {
1129        #[cfg(feature = "tracing")]
1130        let _span = tracing::info_span!("database_checkpoint").entered();
1131        if self.txn_staging.is_some() {
1132            return Err(DbError::Transaction(TransactionError::NestedTransaction));
1133        }
1134
1135        write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
1136
1137        let mut cp = checkpoint::checkpoint_from_state(
1138            self.catalog_for_read(),
1139            self.latest_for_read(),
1140            self.indexes_for_read(),
1141        )?;
1142
1143        let file_len = self.store.len()?;
1144        let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
1145        let checkpoint_offset = writer.offset();
1146
1147        let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
1148        let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
1149        cp.replay_from_offset = replay_from;
1150        let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
1151
1152        let hdr = SegmentHeader {
1153            segment_type: SegmentType::Checkpoint,
1154            payload_len: 0,
1155            payload_crc32c: 0,
1156        };
1157        writer.append(hdr, &payload)?;
1158
1159        publish::append_manifest_and_publish_with_checkpoint(
1160            &mut self.store,
1161            self.segment_start,
1162            Some((checkpoint_offset, payload.len() as u32)),
1163        )?;
1164        self.store.sync()?;
1165        #[cfg(feature = "tracing")]
1166        tracing::info!(
1167            checkpoint_offset,
1168            replay_from,
1169            payload_bytes = payload.len(),
1170            "database_checkpoint_ok"
1171        );
1172        Ok(())
1173    }
1174
1175    /// Test hook: mutate the planned row once on the replace path immediately before Opcode re-encoding.
1176    #[cfg(test)]
1177    #[doc(hidden)]
1178    pub(crate) fn test_arm_replace_encode_poison_once(
1179        &mut self,
1180        poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
1181    ) {
1182        self.test_poison_planned_replace_row = Some(poison);
1183    }
1184
1185    #[cfg(test)]
1186    #[doc(hidden)]
1187    pub(crate) fn test_arm_delete_encode_poison_once(
1188        &mut self,
1189        poison: fn(ScalarValue) -> ScalarValue,
1190    ) {
1191        self.test_poison_delete_encode_scalar = Some(poison);
1192    }
1193
1194    /// Test helper: overwrite one cell in [`Self::latest`] without validation.
1195    #[cfg(test)]
1196    #[doc(hidden)]
1197    pub(crate) fn test_write_latest_cell_unchecked(
1198        &mut self,
1199        collection_id: CollectionId,
1200        pk: &ScalarValue,
1201        field: &str,
1202        value: RowValue,
1203    ) {
1204        let pk_key = pk.canonical_key_bytes();
1205        let row = self
1206            .latest
1207            .get_mut(&(collection_id.0, pk_key))
1208            .expect("test_write_latest_cell_unchecked: unknown row key");
1209        row.insert(field.to_string(), value);
1210    }
1211}
1212
1213impl Database<FileStore> {
1214    /// Rewrite the database into a compacted single-file image at `dest_path`.
1215    ///
1216    /// The destination file is truncated/overwritten if it exists.
1217    pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1218        self.compact_to_with_fsops(&StdFsOps, dest_path)
1219    }
1220
1221    pub(crate) fn compact_to_with_fsops(
1222        &self,
1223        fs: &dyn FsOps,
1224        dest_path: impl AsRef<Path>,
1225    ) -> Result<(), DbError> {
1226        #[cfg(feature = "tracing")]
1227        let _span = tracing::info_span!(
1228            "database_compact_to",
1229            dest = %dest_path.as_ref().display()
1230        )
1231        .entered();
1232        let bytes = self.compact_snapshot_bytes()?;
1233        let path = dest_path.as_ref();
1234        let file = fs
1235            .open_read_write_create_truncate(path)
1236            .map_err(DbError::Io)?;
1237        let mut store = FileStore::new(file);
1238        store.write_all_at(0, &bytes)?;
1239        store.truncate(bytes.len() as u64)?;
1240        store.sync()?;
1241        #[cfg(feature = "tracing")]
1242        tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
1243        Ok(())
1244    }
1245    pub fn compact_in_place(&mut self) -> Result<(), DbError> {
1246        self.compact_in_place_with_fsops(&StdFsOps)
1247    }
1248
1249    pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
1250        #[cfg(feature = "tracing")]
1251        let _span = tracing::info_span!("database_compact_in_place").entered();
1252        // Crash-safety: write a full new image to a sidecar file, fsync it, then atomically
1253        // replace the live path via rename (using a backup on platforms where rename does not
1254        // overwrite an existing destination).
1255        let bytes = self.compact_snapshot_bytes()?;
1256        let live_path = self.path.clone();
1257        let parent = live_path
1258            .parent()
1259            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1260
1261        // Pick unique temp + backup names in the same directory (so rename stays atomic on POSIX).
1262        let pid = std::process::id();
1263        let nanos = std::time::SystemTime::now()
1264            .duration_since(std::time::UNIX_EPOCH)
1265            .map(|d| d.as_nanos())
1266            .unwrap_or(0);
1267        let base = live_path
1268            .file_name()
1269            .and_then(|s| s.to_str())
1270            .unwrap_or("db.modelvault");
1271        let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
1272        let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
1273
1274        // 1) Write the compacted image to tmp and fsync it.
1275        {
1276            let file = fs
1277                .open_read_write_create_new(&tmp_path)
1278                .map_err(DbError::Io)?;
1279            let mut store = FileStore::new(file);
1280            store.write_all_at(0, &bytes)?;
1281            store.truncate(bytes.len() as u64)?;
1282            store.sync()?;
1283        }
1284
1285        // 2) Replace the live file path with the tmp image, preserving a backup until success.
1286        //
1287        // We do not rely on "rename over existing" being supported across platforms. Instead:
1288        // - move live → bak
1289        // - move tmp → live
1290        // - fsync directory (best-effort)
1291        // - remove bak
1292        //
1293        // If tmp → live fails, attempt to restore bak → live.
1294        let _ = fs.remove_file(&bak_path);
1295        fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
1296        let replace_res = fs.rename(&tmp_path, &live_path);
1297        if let Err(e) = replace_res {
1298            // Best-effort restore: move backup back into place.
1299            let _ = fs.rename(&bak_path, &live_path);
1300            // Clean up tmp if it still exists.
1301            let _ = fs.remove_file(&tmp_path);
1302            return Err(DbError::Io(e));
1303        }
1304
1305        // Best-effort directory sync: helps make the rename durable on POSIX.
1306        #[cfg(unix)]
1307        {
1308            // Best-effort: on many Unix platforms, opening a directory and syncing it will persist
1309            // the rename in the directory entry. If this fails, the data file itself is still
1310            // fsync'd and the operation remains logically correct; only rename durability is weaker.
1311            if let Ok(dir_f) = fs.open_dir(parent) {
1312                let _ = dir_f.sync_all();
1313            }
1314        }
1315
1316        let _ = fs.remove_file(&bak_path);
1317
1318        // 3) Refresh in-memory state by reopening.
1319        let reopened = Database::open_with_options(live_path, OpenOptions::default())?;
1320        *self = reopened;
1321        #[cfg(feature = "tracing")]
1322        tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
1323        Ok(())
1324    }
1325
1326    /// Create a consistent backup copy of this on-disk database.
1327    ///
1328    /// This writes a checkpoint (for fast reopen and a stable state marker) and then copies the
1329    /// underlying file bytes to `dest_path`.
1330    pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1331        self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
1332    }
1333
1334    pub(crate) fn export_snapshot_to_path_with_fsops(
1335        &mut self,
1336        fs: &dyn FsOps,
1337        dest_path: impl AsRef<Path>,
1338    ) -> Result<(), DbError> {
1339        self.checkpoint()?;
1340        let dest_path = dest_path.as_ref();
1341        fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
1342        // Strengthen durability of the copied snapshot: fsync the destination and best-effort
1343        // fsync its parent directory so the directory entry is persisted.
1344        if let Ok(f) = fs.open_read(dest_path) {
1345            let _ = f.sync_all();
1346        }
1347        #[cfg(unix)]
1348        best_effort_fsync_parent_dir(fs, dest_path);
1349        Ok(())
1350    }
1351
1352    /// Restore a snapshot file into `dest_path` by atomically replacing the destination.
1353    ///
1354    /// This is a file operation helper intended for operational tooling.
1355    pub fn restore_snapshot_to_path(
1356        snapshot_path: impl AsRef<Path>,
1357        dest_path: impl AsRef<Path>,
1358    ) -> Result<(), DbError> {
1359        Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
1360    }
1361
1362    pub(crate) fn restore_snapshot_to_path_with_fsops(
1363        fs: &dyn FsOps,
1364        snapshot_path: impl AsRef<Path>,
1365        dest_path: impl AsRef<Path>,
1366    ) -> Result<(), DbError> {
1367        let snapshot_path = snapshot_path.as_ref();
1368        let dest_path = dest_path.as_ref();
1369        let parent = dest_path
1370            .parent()
1371            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1372
1373        let pid = std::process::id();
1374        let nanos = std::time::SystemTime::now()
1375            .duration_since(std::time::UNIX_EPOCH)
1376            .map(|d| d.as_nanos())
1377            .unwrap_or(0);
1378        let base = dest_path
1379            .file_name()
1380            .and_then(|s| s.to_str())
1381            .unwrap_or("db.modelvault");
1382        let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
1383        let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
1384
1385        // Copy snapshot bytes into a temp file and fsync it.
1386        fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
1387        if let Ok(f) = fs.open_read(&tmp_path) {
1388            let _ = f.sync_all();
1389        }
1390
1391        // Replace destination with backup/restore semantics.
1392        if dest_path.exists() {
1393            let _ = fs.remove_file(&bak_path);
1394            fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
1395        }
1396        let replace_res = fs.rename(&tmp_path, dest_path);
1397        if let Err(e) = replace_res {
1398            // Best-effort restore original.
1399            if bak_path.exists() {
1400                let _ = fs.rename(&bak_path, dest_path);
1401            }
1402            let _ = fs.remove_file(&tmp_path);
1403            return Err(DbError::Io(e));
1404        }
1405
1406        #[cfg(unix)]
1407        {
1408            if let Ok(dir_f) = fs.open_dir(parent) {
1409                let _ = dir_f.sync_all();
1410            }
1411        }
1412        let _ = fs.remove_file(&bak_path);
1413        Ok(())
1414    }
1415}
1416
1417pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
1418    db: &'a Database<S>,
1419    collection_id: CollectionId,
1420    _marker: PhantomData<T>,
1421}
1422
1423impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
1424    pub fn where_eq(
1425        &self,
1426        path: crate::schema::FieldPath,
1427        value: ScalarValue,
1428    ) -> QueryBuilder<'a, S, T> {
1429        QueryBuilder {
1430            db: self.db,
1431            collection_id: self.collection_id,
1432            predicate: Some(crate::query::Predicate::Eq { path, value }),
1433            limit: None,
1434            _marker: PhantomData,
1435        }
1436    }
1437
1438    pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1439        let q = crate::query::Query {
1440            collection: self.collection_id,
1441            predicate: None,
1442            limit: None,
1443            order_by: None,
1444        };
1445        let rows = self.db.query(&q)?;
1446        Ok(rows.into_iter().map(project_row::<T>).collect())
1447    }
1448}
1449
1450pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
1451    db: &'a Database<S>,
1452    collection_id: CollectionId,
1453    predicate: Option<crate::query::Predicate>,
1454    limit: Option<usize>,
1455    _marker: PhantomData<T>,
1456}
1457
1458impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
1459    pub fn limit(mut self, n: usize) -> Self {
1460        self.limit = Some(n);
1461        self
1462    }
1463
1464    pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1465        let q = crate::query::Query {
1466            collection: self.collection_id,
1467            predicate: self.predicate,
1468            limit: self.limit,
1469            order_by: None,
1470        };
1471        let rows = self.db.query(&q)?;
1472        Ok(rows.into_iter().map(project_row::<T>).collect())
1473    }
1474
1475    pub fn explain(self) -> Result<String, DbError> {
1476        let q = crate::query::Query {
1477            collection: self.collection_id,
1478            predicate: self.predicate,
1479            limit: self.limit,
1480            order_by: None,
1481        };
1482        self.db.explain_query(&q)
1483    }
1484}
1485
1486fn validate_subset_model<T: crate::schema::DbModel>(
1487    col: &crate::catalog::CollectionInfo,
1488) -> Result<(), DbError> {
1489    crate::schema_compat::validate_model_fields_against_catalog(
1490        col,
1491        T::primary_field(),
1492        &T::fields(),
1493        &T::indexes(),
1494    )
1495}
1496
1497/// Build a row map containing only the listed fields (same rules as subset-model projection).
1498pub fn row_subset_by_field_defs(
1499    row: &BTreeMap<String, RowValue>,
1500    wanted: &[FieldDef],
1501) -> BTreeMap<String, RowValue> {
1502    let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
1503    for f in wanted {
1504        let segs = &f.path.0;
1505        if segs.is_empty() {
1506            continue;
1507        }
1508        let Some(leaf) = row_value_at_path_segments(row, segs) else {
1509            continue;
1510        };
1511        let root = segs[0].to_string();
1512        if segs.len() == 1 {
1513            out.insert(root, leaf);
1514        } else {
1515            let nested = row_value_nested_object_path(&segs[1..], leaf);
1516            match out.get_mut(&root) {
1517                Some(existing) => merge_row_value_trees(existing, nested),
1518                None => {
1519                    out.insert(root, nested);
1520                }
1521            }
1522        }
1523    }
1524    out
1525}
1526
1527fn row_value_at_path_segments(
1528    row: &BTreeMap<String, RowValue>,
1529    path: &[std::borrow::Cow<'static, str>],
1530) -> Option<RowValue> {
1531    if path.is_empty() {
1532        return None;
1533    }
1534    let mut cur = row.get(path[0].as_ref())?;
1535    for seg in path.iter().skip(1) {
1536        cur = match cur {
1537            RowValue::Object(m) => m.get(seg.as_ref())?,
1538            RowValue::None => return None,
1539            _ => return None,
1540        };
1541    }
1542    Some(cur.clone())
1543}
1544
1545/// Build `Object({ seg[0]: Object({ seg[1]: ... leaf }) })` for non-empty `seg`.
1546fn row_value_nested_object_path(
1547    segments: &[std::borrow::Cow<'static, str>],
1548    leaf: RowValue,
1549) -> RowValue {
1550    debug_assert!(!segments.is_empty());
1551    if segments.len() == 1 {
1552        let mut m = BTreeMap::new();
1553        m.insert(segments[0].to_string(), leaf);
1554        RowValue::Object(m)
1555    } else {
1556        let mut m = BTreeMap::new();
1557        m.insert(
1558            segments[0].to_string(),
1559            row_value_nested_object_path(&segments[1..], leaf),
1560        );
1561        RowValue::Object(m)
1562    }
1563}
1564
1565fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
1566    match (&mut *into, from) {
1567        (RowValue::Object(m1), RowValue::Object(m2)) => {
1568            for (k, v2) in m2 {
1569                match m1.entry(k) {
1570                    std::collections::btree_map::Entry::Vacant(e) => {
1571                        e.insert(v2);
1572                    }
1573                    std::collections::btree_map::Entry::Occupied(mut e) => {
1574                        merge_row_value_trees(e.get_mut(), v2);
1575                    }
1576                }
1577            }
1578        }
1579        (slot, from) => *slot = from,
1580    }
1581}
1582
1583fn project_row<T: crate::schema::DbModel>(
1584    row: BTreeMap<String, RowValue>,
1585) -> BTreeMap<String, RowValue> {
1586    row_subset_by_field_defs(&row, &T::fields())
1587}
1588
1589pub(crate) fn scalar_at_path(
1590    row: &BTreeMap<String, RowValue>,
1591    path: &crate::schema::FieldPath,
1592) -> Option<ScalarValue> {
1593    let mut cur: Option<&RowValue> = None;
1594    for (i, seg) in path.0.iter().enumerate() {
1595        let key = seg.as_ref();
1596        cur = match (i, cur) {
1597            (0, _) => row.get(key),
1598            (_, Some(RowValue::Object(map))) => map.get(key),
1599            (_, Some(RowValue::None)) => return None,
1600            _ => return None,
1601        };
1602    }
1603    cur.and_then(|v| v.as_scalar())
1604}
1605
1606impl Database<FileStore> {
1607    /// Open an existing file or create a new database at `path`.
1608    ///
1609    /// Creates parent directories as needed via the OS; the file is opened read/write.
1610    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
1611        Self::open_with_options(path, crate::config::OpenOptions::default())
1612    }
1613
1614    /// Open an existing file read-only (does not create it).
1615    pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
1616        Self::open_with_options(
1617            path,
1618            crate::config::OpenOptions {
1619                recovery: crate::config::RecoveryMode::Strict,
1620                mode: OpenMode::ReadOnly,
1621            },
1622        )
1623    }
1624
1625    /// Open with recovery and other options (see [`crate::config::OpenOptions`]).
1626    pub fn open_with_options(
1627        path: impl AsRef<Path>,
1628        opts: crate::config::OpenOptions,
1629    ) -> Result<Self, DbError> {
1630        let path = path.as_ref().to_path_buf();
1631        let store = FileStore::open_locked(&path, opts.mode)?;
1632        Self::open_with_store(path, store, opts)
1633    }
1634}
1635
1636impl Database<VecStore> {
1637    /// New empty in-memory database (same on-disk layout as a new file image in a [`VecStore`]).
1638    pub fn open_in_memory() -> Result<Self, DbError> {
1639        Self::open_in_memory_with_options(crate::config::OpenOptions::default())
1640    }
1641
1642    /// In-memory open with [`crate::config::OpenOptions`].
1643    pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
1644        Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
1645    }
1646
1647    /// Deserialize a full database image from bytes (e.g. from [`into_snapshot_bytes`](Self::into_snapshot_bytes)).
1648    pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
1649        Self::open_with_store(
1650            PathBuf::from(":memory:"),
1651            VecStore::from_vec(bytes),
1652            crate::config::OpenOptions::default(),
1653        )
1654    }
1655
1656    /// Consume `self` and return the owned byte buffer backing the store.
1657    pub fn into_snapshot_bytes(self) -> Vec<u8> {
1658        self.store.into_inner()
1659    }
1660
1661    /// Clone of the full serialized database image (alias of the buffer returned by [`into_snapshot_bytes`](Self::into_snapshot_bytes)).
1662    pub fn snapshot_bytes(&self) -> Vec<u8> {
1663        self.store.as_slice().to_vec()
1664    }
1665
1666    /// Write the full in-memory database image to `dest_path`.
1667    pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1668        Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
1669    }
1670
1671    pub(crate) fn export_snapshot_to_path_with_fsops(
1672        fs: &dyn FsOps,
1673        dest_path: impl AsRef<Path>,
1674        bytes: &[u8],
1675    ) -> Result<(), DbError> {
1676        fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
1677        Ok(())
1678    }
1679
1680    /// Open an in-memory database from a snapshot file.
1681    pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
1682        let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
1683        Self::from_snapshot_bytes(bytes)
1684    }
1685}
1686
1687#[cfg(test)]
1688mod scalar_at_path_tests {
1689    include!(concat!(
1690        env!("CARGO_MANIFEST_DIR"),
1691        "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
1692    ));
1693}
1694
1695#[cfg(test)]
1696mod tests {
1697    include!(concat!(
1698        env!("CARGO_MANIFEST_DIR"),
1699        "/tests/unit/src_db_mod_tests.rs"
1700    ));
1701}