Skip to main content

modelvault_core/db/
mod.rs

1//! Database handle and orchestration.
2//!
3//! [`Database`] is implemented using internal modules `open` (bootstrap), `replay` (catalog and
4//! rows from segments), `write` (append segments and publish), and `helpers` (name rules).
5
6mod fs_ops;
7mod helpers;
8mod open;
9mod recover;
10mod replay;
11pub(crate) mod row_materialize;
12mod writer_registry;
13pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
14mod row_merge;
15pub(crate) mod row_paths;
16pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
17mod write;
18
19use std::collections::{BTreeMap, HashMap};
20use std::marker::PhantomData;
21use std::path::{Path, PathBuf};
22
23use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
24use crate::config::{OpenMode, OpenOptions};
25use crate::error::{DbError, FormatError, SchemaError, TransactionError};
26use crate::index::IndexState;
27use crate::index::{encode_index_payload, IndexEntry, IndexOp};
28use crate::record::{
29    encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
30    encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
31    OP_REPLACE,
32};
33use crate::schema::{classify_schema_update, SchemaChange};
34use crate::schema::{CollectionId, FieldDef, SchemaVersion};
35use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
36use crate::segments::writer::SegmentWriter;
37use crate::storage::{FileStore, Store, VecStore};
38use crate::validation;
39use crate::{checkpoint, publish};
40use crate::{MigrationPlan, MigrationStep};
41
42use self::fs_ops::{FsOps, StdFsOps};
43
44/// Best-effort `fsync` on `dest_path`'s parent directory (Unix only).
45#[cfg(unix)]
46fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
47    let Some(parent) = dest_path.parent() else {
48        return;
49    };
50    let Ok(dir_f) = fs.open_dir(parent) else {
51        return;
52    };
53    let _ = dir_f.sync_all();
54}
55
56pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
57
58type PlannedInsert = (
59    Vec<u8>,
60    (Vec<u8>, BTreeMap<String, RowValue>),
61    Vec<IndexEntry>,
62    ScalarValue,
63);
64
65fn plan_insert_row(
66    catalog: &Catalog,
67    collection_id: CollectionId,
68    mut row: BTreeMap<String, RowValue>,
69) -> Result<PlannedInsert, DbError> {
70    let col =
71        catalog
72            .get(collection_id)
73            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
74                id: collection_id.0,
75            }))?;
76    let pk_name =
77        col.primary_field
78            .as_deref()
79            .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
80                collection_id: collection_id.0,
81            }))?;
82    let pk_def = col
83        .fields
84        .iter()
85        .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
86        .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
87            name: pk_name.to_string(),
88        }))?;
89    let pk_ty = &pk_def.ty;
90    validation::ensure_pk_type_primitive(pk_ty)?;
91    let mut pk_path = vec![pk_name.to_string()];
92    let pk_cell = row
93        .get(pk_name)
94        .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
95            name: pk_name.to_string(),
96        }))?;
97    validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
98    // Validate unknown fields: for nested schema paths we validate by traversing row objects.
99    // For legacy single-segment schemas, keep the existing top-level validation.
100    let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
101    if !has_multi_segment_schema {
102        validation::validate_top_level_row(&col.fields, pk_name, &row)?;
103    } else {
104        validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
105    }
106
107    // `pk_cell` is already present (validated above), so remove must succeed.
108    let pk_val = row.remove(pk_name).unwrap();
109    let pk_scalar = pk_val.clone().into_scalar()?;
110
111    // Build non-PK values in schema order.
112    // - legacy v2: single-segment top-level field defs
113    // - v3: full FieldPath for each non-PK def (multi-segment allowed)
114    let non_pk_defs = if has_multi_segment_schema {
115        col.fields
116            .iter()
117            .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
118            .collect::<Vec<_>>()
119    } else {
120        non_pk_defs_in_order(&col.fields, pk_name)
121    };
122    let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
123
124    let payload = if has_multi_segment_schema {
125        encode_record_payload_v3(
126            collection_id.0,
127            col.current_version.0,
128            &pk_scalar,
129            pk_ty,
130            &non_pk,
131        )?
132    } else {
133        encode_record_payload_v2(
134            collection_id.0,
135            col.current_version.0,
136            &pk_scalar,
137            pk_ty,
138            &non_pk,
139        )?
140    };
141
142    let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
143    full_map.insert(pk_name.to_string(), pk_val);
144    for (def, v) in &non_pk {
145        let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
146        if parts.len() == 1 {
147            full_map.insert(parts[0].clone(), v.clone());
148        } else {
149            debug_assert!(parts.len() >= 2);
150            row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
151        }
152    }
153    let mut index_entries: Vec<IndexEntry> = Vec::new();
154    for idx in &col.indexes {
155        let Some(v) = scalar_at_path(&full_map, &idx.path) else {
156            continue;
157        };
158        index_entries.push(IndexEntry {
159            collection_id: collection_id.0,
160            index_name: idx.name.clone(),
161            kind: idx.kind,
162            op: IndexOp::Insert,
163            index_key: v.canonical_key_bytes(),
164            pk_key: pk_scalar.canonical_key_bytes(),
165        });
166    }
167    let pk_key = pk_scalar.canonical_key_bytes();
168    Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
169}
170
171fn index_deletes_for_existing_row(
172    collection_id: CollectionId,
173    pk_scalar: &ScalarValue,
174    indexes: &[crate::schema::IndexDef],
175    existing_row: &BTreeMap<String, RowValue>,
176) -> Vec<IndexEntry> {
177    let mut out = Vec::new();
178    for idx in indexes {
179        let Some(v) = scalar_at_path(existing_row, &idx.path) else {
180            continue;
181        };
182        out.push(IndexEntry {
183            collection_id: collection_id.0,
184            index_name: idx.name.clone(),
185            kind: idx.kind,
186            op: IndexOp::Delete,
187            index_key: v.canonical_key_bytes(),
188            pk_key: pk_scalar.canonical_key_bytes(),
189        });
190    }
191    out
192}
193
194/// Staged writes while [`Database::transaction`] is executing.
195pub(crate) struct TxnStaging {
196    pub(crate) txn_id: u64,
197    pub(crate) shadow_catalog: Catalog,
198    pub(crate) shadow_latest: LatestMap,
199    pub(crate) shadow_indexes: IndexState,
200    pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
201}
202
203/// Opened ModelVault database: generic over a [`Store`] ([`FileStore`] on disk, [`VecStore`] in memory).
204pub struct Database<S: Store = FileStore> {
205    /// Path shown by [`Database::path`] (`":memory:"` for [`VecStore`]).
206    path: PathBuf,
207    store: S,
208    /// In-memory view of schema segments replayed from disk.
209    catalog: Catalog,
210    /// Byte offset where the append-only segment log begins (after header and superblocks).
211    segment_start: u64,
212    /// Format minor from the file header; may be lazily upgraded (`3` → `4` → `5`) on write.
213    format_minor: u16,
214    /// Latest row per `(collection_id, canonical primary-key bytes)`; last replayed insert wins.
215    latest: LatestMap,
216    /// Secondary indexes rebuilt from replayed `Index` segments.
217    indexes: IndexState,
218    /// Monotonic id for transaction marker segments (format minor 6+).
219    txn_seq: u64,
220    /// When set, [`insert`] / [`register_collection`] append to this batch instead of autocommit.
221    txn_staging: Option<TxnStaging>,
222    /// Present for writable on-disk databases (process-wide single-writer registry).
223    #[allow(dead_code)]
224    writer_registry: Option<writer_registry::WriterRegistryGuard>,
225    /// Covers replace-path record encoding error branches in tests (misaligned validated row maps).
226    #[cfg(test)]
227    #[doc(hidden)]
228    #[allow(clippy::type_complexity)]
229    pub(crate) test_poison_planned_replace_row:
230        Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
231    /// Covers delete Opcode payload encoding `?` by supplying a bogus scalar unrelated to validated `pk`.
232    #[cfg(test)]
233    #[doc(hidden)]
234    pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
235}
236
237impl<S: Store> Database<S> {
238    fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
239        let mut out = Database::<VecStore>::open_in_memory()?;
240
241        // Recreate catalog (stable ids if created in id order).
242        let mut cols = self.catalog_for_read().collections();
243        cols.sort_by_key(|c| c.id.0);
244        for c in &cols {
245            let pk =
246                c.primary_field
247                    .as_deref()
248                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
249                        collection_id: c.id.0,
250                    }))?;
251            let (new_id, _v1) = out.register_collection_with_indexes(
252                &c.name,
253                c.fields.clone(),
254                c.indexes.clone(),
255                pk,
256            )?;
257            // Bump schema version counter to match current_version (repeat identical schema).
258            for _ in 2..=c.current_version.0 {
259                let _ = out.register_schema_version_with_indexes_force(
260                    new_id,
261                    c.fields.clone(),
262                    c.indexes.clone(),
263                )?;
264            }
265        }
266
267        // Copy latest rows (in-memory snapshot semantics).
268        for ((cid, _), row) in self.latest_for_read().iter() {
269            let collection_id = CollectionId(*cid);
270            out.insert(collection_id, row.clone())?;
271        }
272
273        Ok(out.into_snapshot_bytes())
274    }
275
276    pub(crate) fn open_with_store(
277        path: PathBuf,
278        store: S,
279        opts: OpenOptions,
280    ) -> Result<Self, DbError> {
281        open::open_with_store(path, store, opts)
282    }
283
284    fn next_txn_id(&mut self) -> u64 {
285        self.txn_seq = self.txn_seq.saturating_add(1);
286        self.txn_seq
287    }
288
289    #[inline]
290    fn commit_write_batch(
291        &mut self,
292        txn_id: u64,
293        body: &[(crate::segments::header::SegmentType, &[u8])],
294    ) -> Result<(), DbError> {
295        write::commit_write_txn_v6(
296            &mut self.store,
297            self.segment_start,
298            &mut self.format_minor,
299            txn_id,
300            body,
301        )
302    }
303
304    #[inline]
305    fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
306        self.catalog.apply_record(wire)
307    }
308
309    /// Run `f` inside a multi-write transaction: durable segments are written on success.
310    ///
311    /// On error, staged work is discarded and nothing new is appended to the log.
312    pub fn transaction<R>(
313        &mut self,
314        f: impl FnOnce(&mut Self) -> Result<R, DbError>,
315    ) -> Result<R, DbError> {
316        self.begin_transaction()?;
317        match f(self) {
318            Ok(v) => match self.commit_transaction() {
319                Ok(()) => Ok(v),
320                Err(e) => {
321                    self.rollback_transaction();
322                    Err(e)
323                }
324            },
325            Err(e) => {
326                self.rollback_transaction();
327                Err(e)
328            }
329        }
330    }
331
332    /// Start a transaction (for bindings that cannot use the closure API). Pairs with
333    /// [`Self::commit_transaction`] or [`Self::rollback_transaction`].
334    pub fn begin_transaction(&mut self) -> Result<(), DbError> {
335        if self.txn_staging.is_some() {
336            return Err(DbError::Transaction(TransactionError::NestedTransaction));
337        }
338        let tid = self.next_txn_id();
339        self.txn_staging = Some(TxnStaging {
340            txn_id: tid,
341            shadow_catalog: self.catalog.clone(),
342            shadow_latest: self.latest.clone(),
343            shadow_indexes: self.indexes.clone(),
344            pending: Vec::new(),
345        });
346        Ok(())
347    }
348
349    /// Commit the active transaction started with [`Self::begin_transaction`].
350    pub fn commit_transaction(&mut self) -> Result<(), DbError> {
351        self.commit_txn_staging()
352    }
353
354    /// Discard the active transaction without writing to the log.
355    pub fn rollback_transaction(&mut self) {
356        self.txn_staging = None;
357    }
358
359    fn commit_txn_staging(&mut self) -> Result<(), DbError> {
360        let Some(st) = self.txn_staging.take() else {
361            return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
362        };
363        if st.pending.is_empty() {
364            self.catalog = st.shadow_catalog;
365            self.latest = st.shadow_latest;
366            self.indexes = st.shadow_indexes;
367            return Ok(());
368        }
369        let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
370            st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
371        self.commit_write_batch(st.txn_id, &batch)?;
372        self.catalog = st.shadow_catalog;
373        self.latest = st.shadow_latest;
374        self.indexes = st.shadow_indexes;
375        Ok(())
376    }
377
378    fn catalog_for_read(&self) -> &Catalog {
379        if let Some(ref st) = self.txn_staging {
380            &st.shadow_catalog
381        } else {
382            &self.catalog
383        }
384    }
385
386    fn indexes_for_read(&self) -> &IndexState {
387        if let Some(ref st) = self.txn_staging {
388            &st.shadow_indexes
389        } else {
390            &self.indexes
391        }
392    }
393
394    fn latest_for_read(&self) -> &LatestMap {
395        if let Some(ref st) = self.txn_staging {
396            &st.shadow_latest
397        } else {
398            &self.latest
399        }
400    }
401
402    /// Path passed to [`Database::open`](Database::<FileStore>::open), or `":memory:"` for [`VecStore`].
403    pub fn path(&self) -> &Path {
404        &self.path
405    }
406
407    /// Read-only view of the schema catalog built from `Schema` segments.
408    pub fn catalog(&self) -> &Catalog {
409        self.catalog_for_read()
410    }
411
412    /// All registered collection names in lexicographic order.
413    pub fn collection_names(&self) -> Vec<String> {
414        self.catalog_for_read().collection_names()
415    }
416
417    /// Read-only access to the in-memory secondary index state (rebuilt from `Index` segments).
418    pub fn index_state(&self) -> &IndexState {
419        self.indexes_for_read()
420    }
421
422    /// Execute a query against the current in-memory snapshot of the database.
423    pub fn query(
424        &self,
425        q: &crate::query::Query,
426    ) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
427        crate::query::execute_query(
428            self.catalog_for_read(),
429            self.indexes_for_read(),
430            self.latest_for_read(),
431            q,
432        )
433    }
434
435    /// Return a human-readable explanation of the chosen plan for `q`.
436    pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
437        crate::query::explain_query(self.catalog_for_read(), q)
438    }
439
440    /// Lazy iterator over query rows (same semantics as [`Self::query`]).
441    ///
442    /// See [`crate::query::QueryRowIter`] — this is the v0.7 pull-based execution boundary, not a
443    /// full operator graph.
444    pub fn query_iter(
445        &self,
446        q: &crate::query::Query,
447    ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
448        crate::query::execute_query_iter_with_spill_path(
449            self.catalog_for_read(),
450            self.indexes_for_read(),
451            self.latest_for_read(),
452            q,
453            Some(self.path.as_path()),
454        )
455    }
456
457    /// Register the collection schema defined by `T` (schema version 1).
458    pub fn register_model<T: crate::schema::DbModel>(
459        &mut self,
460    ) -> Result<(CollectionId, SchemaVersion), DbError> {
461        self.register_collection_with_indexes(
462            T::collection_name(),
463            T::fields(),
464            T::indexes(),
465            T::primary_field(),
466        )
467    }
468
469    /// Typed handle over a registered collection; `T` may be a *subset model*.
470    pub fn collection<'a, T: crate::schema::DbModel>(
471        &'a self,
472    ) -> Result<Collection<'a, S, T>, DbError> {
473        let cid = self.collection_id_named(T::collection_name())?;
474        let col = self.catalog_for_read().get(cid).ok_or(DbError::Schema(
475            SchemaError::UnknownCollection { id: cid.0 },
476        ))?;
477        validate_subset_model::<T>(col)?;
478        Ok(Collection {
479            db: self,
480            collection_id: cid,
481            _marker: PhantomData,
482        })
483    }
484
485    /// Look up [`CollectionId`] by collection name (leading/trailing whitespace trimmed).
486    ///
487    /// Returns [`SchemaError::UnknownCollectionName`] when the name is not registered.
488    pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
489        self.catalog_for_read()
490            .lookup_name(name)
491            .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
492                name: name.trim().to_string(),
493            }))
494    }
495
496    /// Create a new collection at schema version `1`.
497    ///
498    /// `primary_field` must name a **single-segment** (top-level) field present in `fields`.
499    /// Appends a catalog segment and updates the in-memory catalog.
500    pub fn register_collection(
501        &mut self,
502        name: &str,
503        fields: Vec<FieldDef>,
504        primary_field: &str,
505    ) -> Result<(CollectionId, SchemaVersion), DbError> {
506        self.register_collection_with_indexes(name, fields, vec![], primary_field)
507    }
508
509    pub fn register_collection_with_indexes(
510        &mut self,
511        name: &str,
512        fields: Vec<FieldDef>,
513        indexes: Vec<crate::schema::IndexDef>,
514        primary_field: &str,
515    ) -> Result<(CollectionId, SchemaVersion), DbError> {
516        let name = helpers::normalize_collection_name(name)?;
517        let pk = primary_field.trim();
518        if pk.is_empty() {
519            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
520        }
521        if !Catalog::has_top_level_field(&fields, pk) {
522            return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
523                name: pk.to_string(),
524            }));
525        }
526        if let Some(st) = &mut self.txn_staging {
527            let id = st.shadow_catalog.next_collection_id().0;
528            let wire = CatalogRecordWire::CreateCollection {
529                collection_id: id,
530                name: name.clone(),
531                schema_version: 1,
532                fields,
533                indexes,
534                primary_field: Some(pk.to_string()),
535            };
536            let payload = encode_catalog_payload(&wire);
537            st.shadow_catalog.apply_record(wire)?;
538            st.pending
539                .push((crate::segments::header::SegmentType::Schema, payload));
540            return Ok((CollectionId(id), SchemaVersion(1)));
541        }
542        let id = self.catalog.next_collection_id().0;
543        let wire = CatalogRecordWire::CreateCollection {
544            collection_id: id,
545            name: name.clone(),
546            schema_version: 1,
547            fields,
548            indexes,
549            primary_field: Some(pk.to_string()),
550        };
551        let payload = encode_catalog_payload(&wire);
552        let tid = self.next_txn_id();
553        self.commit_write_batch(
554            tid,
555            &[(
556                crate::segments::header::SegmentType::Schema,
557                payload.as_slice(),
558            )],
559        )?;
560        self.apply_catalog_record(wire)?;
561        Ok((CollectionId(id), SchemaVersion(1)))
562    }
563
564    /// Bump the schema version for `id` to `current + 1` with a new field set.
565    ///
566    /// The primary-key field must remain present as a top-level field (see catalog rules).
567    pub fn register_schema_version(
568        &mut self,
569        id: CollectionId,
570        fields: Vec<FieldDef>,
571    ) -> Result<SchemaVersion, DbError> {
572        self.register_schema_version_with_indexes(id, fields, vec![])
573    }
574
575    pub fn register_schema_version_with_indexes(
576        &mut self,
577        id: CollectionId,
578        fields: Vec<FieldDef>,
579        indexes: Vec<crate::schema::IndexDef>,
580    ) -> Result<SchemaVersion, DbError> {
581        let current = self
582            .catalog_for_read()
583            .get(id)
584            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
585        // `classify_schema_update` only returns `Ok(...)` variants today; keep it infallible here.
586        match classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)? {
587            SchemaChange::Safe => {}
588            SchemaChange::NeedsMigration { reason, .. } => {
589                return Err(DbError::Schema(SchemaError::MigrationRequired {
590                    message: reason,
591                }));
592            }
593            SchemaChange::Breaking { reason } => {
594                return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
595                    message: reason,
596                }));
597            }
598        }
599        let next_v = current
600            .current_version
601            .0
602            .checked_add(1)
603            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
604        let wire = CatalogRecordWire::NewSchemaVersion {
605            collection_id: id.0,
606            schema_version: next_v,
607            fields,
608            indexes,
609        };
610        let payload = encode_catalog_payload(&wire);
611        if let Some(st) = &mut self.txn_staging {
612            st.shadow_catalog.apply_record(wire.clone())?;
613            st.pending
614                .push((crate::segments::header::SegmentType::Schema, payload));
615            return Ok(SchemaVersion(next_v));
616        }
617        let tid = self.next_txn_id();
618        self.commit_write_batch(
619            tid,
620            &[(
621                crate::segments::header::SegmentType::Schema,
622                payload.as_slice(),
623            )],
624        )?;
625        self.apply_catalog_record(wire)?;
626        Ok(SchemaVersion(next_v))
627    }
628
629    /// Plan a schema version bump and return the required migration steps, if any.
630    pub fn plan_schema_version_with_indexes(
631        &self,
632        id: CollectionId,
633        fields: Vec<FieldDef>,
634        indexes: Vec<crate::schema::IndexDef>,
635    ) -> Result<MigrationPlan, DbError> {
636        let current = self
637            .catalog_for_read()
638            .get(id)
639            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
640        // Same infallibility contract as `register_schema_version_with_indexes` above.
641        let change = classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)?;
642        let mut steps = Vec::new();
643        match &change {
644            SchemaChange::Safe => {}
645            SchemaChange::Breaking { .. } => {}
646            SchemaChange::NeedsMigration {
647                reason,
648                backfill_top_level_field,
649                backfill_field_path,
650            } => {
651                if let Some(field) = backfill_top_level_field {
652                    steps.push(MigrationStep::BackfillTopLevelField {
653                        field: field.clone(),
654                    });
655                } else if let Some(path) = backfill_field_path {
656                    steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
657                } else if reason.contains("unique index") {
658                    steps.push(MigrationStep::RebuildIndexes);
659                }
660            }
661        }
662        Ok(MigrationPlan { change, steps })
663    }
664
665    /// Backfill a missing top-level field with a fixed value for all rows in a collection.
666    ///
667    /// This helper is intentionally simple so it can be bound to other languages.
668    pub fn backfill_top_level_field_with_value(
669        &mut self,
670        collection_id: CollectionId,
671        field: &str,
672        value: RowValue,
673    ) -> Result<(), DbError> {
674        let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
675        self.backfill_field_at_path_with_value(collection_id, &path, value)
676    }
677
678    /// Backfill a missing field (any segment path) with a fixed value for all rows.
679    pub fn backfill_field_at_path_with_value(
680        &mut self,
681        collection_id: CollectionId,
682        path: &crate::schema::FieldPath,
683        value: RowValue,
684    ) -> Result<(), DbError> {
685        let col = self
686            .catalog_for_read()
687            .get(collection_id)
688            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
689                id: collection_id.0,
690            }))?;
691        let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
692            DbError::Schema(SchemaError::RowUnknownField {
693                name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
694            })
695        })?;
696
697        let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
698        for ((cid, _), row) in self.latest_for_read().iter() {
699            if *cid != collection_id.0 {
700                continue;
701            }
702            rows.push(row.clone());
703        }
704
705        self.transaction(|db| {
706            for mut row in rows {
707                if row_value_at_path_segments(&row, &path.0).is_some() {
708                    continue;
709                }
710                crate::record::insert_value_at_path(&mut row, path, value.clone())?;
711                db.insert(collection_id, row)?;
712            }
713            Ok(())
714        })
715    }
716
717    /// Rebuild index entries for all rows in `collection_id` using the current schema’s index defs.
718    pub fn rebuild_indexes_for_collection(
719        &mut self,
720        collection_id: CollectionId,
721    ) -> Result<(), DbError> {
722        let col = self
723            .catalog_for_read()
724            .get(collection_id)
725            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
726                id: collection_id.0,
727            }))?;
728        let pk_name =
729            col.primary_field
730                .as_deref()
731                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
732                    collection_id: collection_id.0,
733                }))?;
734        let pk_def = col
735            .fields
736            .iter()
737            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
738            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
739                name: pk_name.to_string(),
740            }))?;
741
742        let mut entries: Vec<IndexEntry> = Vec::new();
743        for ((cid, _), row) in self.latest_for_read().iter() {
744            if *cid != collection_id.0 {
745                continue;
746            }
747            let Some(pk_cell) = row.get(pk_name) else {
748                continue;
749            };
750            let pk_scalar = pk_cell.clone().into_scalar()?;
751            if !pk_scalar.ty_matches(&pk_def.ty) {
752                continue;
753            }
754            for idx in &col.indexes {
755                let Some(v) = scalar_at_path(row, &idx.path) else {
756                    continue;
757                };
758                entries.push(IndexEntry {
759                    collection_id: collection_id.0,
760                    index_name: idx.name.clone(),
761                    kind: idx.kind,
762                    op: IndexOp::Insert,
763                    index_key: v.canonical_key_bytes(),
764                    pk_key: pk_scalar.canonical_key_bytes(),
765                });
766            }
767        }
768
769        self.transaction(|db| {
770            if entries.is_empty() {
771                return Ok(());
772            }
773            // Apply in-memory + persist as one index segment batch.
774            // `begin_transaction` always installs `txn_staging` before this closure runs.
775            let st = db
776                .txn_staging
777                .as_mut()
778                .expect("transaction staging must be active");
779            let b = encode_index_payload(&entries);
780            st.pending
781                .push((crate::segments::header::SegmentType::Index, b));
782            for e in entries {
783                st.shadow_indexes.apply(e)?;
784            }
785            Ok(())
786        })
787    }
788
789    /// Force-register a new schema version, bypassing compatibility checks.
790    ///
791    /// This is an escape hatch for advanced workflows where the caller performs an out-of-band
792    /// data rewrite (or accepts inconsistent index/query behavior until a rebuild).
793    pub fn register_schema_version_with_indexes_force(
794        &mut self,
795        id: CollectionId,
796        fields: Vec<FieldDef>,
797        indexes: Vec<crate::schema::IndexDef>,
798    ) -> Result<SchemaVersion, DbError> {
799        let current = self
800            .catalog_for_read()
801            .get(id)
802            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
803        let next_v = current
804            .current_version
805            .0
806            .checked_add(1)
807            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
808        let wire = CatalogRecordWire::NewSchemaVersion {
809            collection_id: id.0,
810            schema_version: next_v,
811            fields,
812            indexes,
813        };
814        let payload = encode_catalog_payload(&wire);
815        if let Some(st) = &mut self.txn_staging {
816            st.shadow_catalog.apply_record(wire.clone())?;
817            st.pending
818                .push((crate::segments::header::SegmentType::Schema, payload));
819            return Ok(SchemaVersion(next_v));
820        }
821        let tid = self.next_txn_id();
822        self.commit_write_batch(
823            tid,
824            &[(
825                crate::segments::header::SegmentType::Schema,
826                payload.as_slice(),
827            )],
828        )?;
829        self.apply_catalog_record(wire)?;
830        Ok(SchemaVersion(next_v))
831    }
832
833    /// Insert or replace the row for `collection_id` identified by its primary-key field.
834    ///
835    /// `row` maps **top-level** field names to [`RowValue`]. The primary key field must be present.
836    /// Only single-segment field paths are supported in 0.6.x.
837    pub fn insert(
838        &mut self,
839        collection_id: CollectionId,
840        row: BTreeMap<String, RowValue>,
841    ) -> Result<(), DbError> {
842        write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
843        let (mut payload, full, mut index_entries, pk_scalar) =
844            plan_insert_row(self.catalog_for_read(), collection_id, row)?;
845        #[cfg(test)]
846        let mut full = full;
847        let existing = self
848            .latest_for_read()
849            .get(&(collection_id.0, full.0.clone()))
850            .cloned();
851        if existing.is_some() {
852            #[cfg(test)]
853            if let Some(poison) = self.test_poison_planned_replace_row.take() {
854                poison(collection_id, &mut full.1);
855            }
856            // Re-encode with explicit replace opcode.
857            let col = self
858                .catalog_for_read()
859                .get(collection_id)
860                .ok_or(DbError::Schema(SchemaError::UnknownCollection {
861                    id: collection_id.0,
862                }))?;
863            let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
864            let pk_name =
865                col.primary_field
866                    .as_deref()
867                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
868                        collection_id: collection_id.0,
869                    }))?;
870            let pk_def = col
871                .fields
872                .iter()
873                .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
874                .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
875                    name: pk_name.to_string(),
876                }))?;
877
878            let non_pk_defs = if has_multi_segment_schema {
879                col.fields
880                    .iter()
881                    .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
882                    .collect::<Vec<_>>()
883            } else {
884                non_pk_defs_in_order(&col.fields, pk_name)
885            };
886            let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
887            for def in &non_pk_defs {
888                let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
889                non_pk.push(((*def).clone(), v));
890            }
891            payload = (if has_multi_segment_schema {
892                encode_record_payload_v3_op(
893                    collection_id.0,
894                    col.current_version.0,
895                    OP_REPLACE,
896                    &pk_scalar,
897                    &pk_def.ty,
898                    &non_pk,
899                )
900            } else {
901                encode_record_payload_v2_op(
902                    collection_id.0,
903                    col.current_version.0,
904                    OP_REPLACE,
905                    &pk_scalar,
906                    &pk_def.ty,
907                    &non_pk,
908                )
909            })?;
910            // Prepend index deletes for any existing row.
911            if let Some(ref old_row) = existing {
912                let mut deletes = index_deletes_for_existing_row(
913                    collection_id,
914                    &pk_scalar,
915                    &col.indexes,
916                    old_row,
917                );
918                deletes.append(&mut index_entries);
919                index_entries = deletes;
920            }
921        }
922        for e in &index_entries {
923            if e.kind != crate::schema::IndexKind::Unique {
924                continue;
925            }
926            let Some(existing) =
927                self.indexes_for_read()
928                    .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
929            else {
930                continue;
931            };
932            if e.op != IndexOp::Insert {
933                continue;
934            }
935            if existing == e.pk_key.as_slice() {
936                continue;
937            }
938            return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
939        }
940        if let Some(st) = &mut self.txn_staging {
941            if !index_entries.is_empty() {
942                let b = encode_index_payload(&index_entries);
943                st.pending
944                    .push((crate::segments::header::SegmentType::Index, b));
945            }
946            st.pending.push((
947                crate::segments::header::SegmentType::Record,
948                payload.clone(),
949            ));
950            st.shadow_latest
951                .insert((collection_id.0, full.0.clone()), full.1.clone());
952            for e in index_entries {
953                st.shadow_indexes.apply(e)?;
954            }
955            return Ok(());
956        }
957        let tid = self.next_txn_id();
958        let index_bytes = if index_entries.is_empty() {
959            None
960        } else {
961            Some(encode_index_payload(&index_entries))
962        };
963        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
964        if let Some(ref b) = index_bytes {
965            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
966        }
967        batch.push((
968            crate::segments::header::SegmentType::Record,
969            payload.as_slice(),
970        ));
971        self.commit_write_batch(tid, &batch)?;
972        self.latest.insert((collection_id.0, full.0), full.1);
973        for e in index_entries {
974            self.indexes.apply(e)?;
975        }
976        Ok(())
977    }
978
979    /// Delete the row for `collection_id` identified by its primary key.
980    pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
981        write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
982        let col = self
983            .catalog_for_read()
984            .get(collection_id)
985            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
986                id: collection_id.0,
987            }))?;
988        let pk_name =
989            col.primary_field
990                .as_deref()
991                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
992                    collection_id: collection_id.0,
993                }))?;
994        let pk_def = col
995            .fields
996            .iter()
997            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
998            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
999                name: pk_name.to_string(),
1000            }))?;
1001        if !pk.ty_matches(&pk_def.ty) {
1002            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1003        }
1004        let pk_key = pk.canonical_key_bytes();
1005        let existing = self
1006            .latest_for_read()
1007            .get(&(collection_id.0, pk_key.clone()))
1008            .cloned();
1009        let Some(old_row) = existing else {
1010            return Ok(());
1011        };
1012        let indexes = col.indexes.clone();
1013        let schema_ver = col.current_version.0;
1014        let pk_ty = pk_def.ty.clone();
1015        let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
1016
1017        let mut index_entries =
1018            index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
1019        #[cfg(not(test))]
1020        let pk_for_record = pk.clone();
1021        #[cfg(test)]
1022        let pk_for_record = {
1023            let mut p = pk.clone();
1024            if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
1025                p = poison(p);
1026            }
1027            p
1028        };
1029        let record_payload = (if has_multi_segment_schema {
1030            encode_record_payload_v3_op(
1031                collection_id.0,
1032                schema_ver,
1033                OP_DELETE,
1034                &pk_for_record,
1035                &pk_ty,
1036                &[],
1037            )
1038        } else {
1039            encode_record_payload_v2_op(
1040                collection_id.0,
1041                schema_ver,
1042                OP_DELETE,
1043                &pk_for_record,
1044                &pk_ty,
1045                &[],
1046            )
1047        })?;
1048
1049        if let Some(st) = &mut self.txn_staging {
1050            if !index_entries.is_empty() {
1051                let b = encode_index_payload(&index_entries);
1052                st.pending
1053                    .push((crate::segments::header::SegmentType::Index, b));
1054            }
1055            st.pending.push((
1056                crate::segments::header::SegmentType::Record,
1057                record_payload.clone(),
1058            ));
1059            st.shadow_latest.remove(&(collection_id.0, pk_key));
1060            for e in index_entries.drain(..) {
1061                st.shadow_indexes.apply(e)?;
1062            }
1063            return Ok(());
1064        }
1065
1066        let tid = self.next_txn_id();
1067        let index_bytes = if index_entries.is_empty() {
1068            None
1069        } else {
1070            Some(encode_index_payload(&index_entries))
1071        };
1072        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
1073        if let Some(ref b) = index_bytes {
1074            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
1075        }
1076        batch.push((
1077            crate::segments::header::SegmentType::Record,
1078            record_payload.as_slice(),
1079        ));
1080        self.commit_write_batch(tid, &batch)?;
1081        self.latest.remove(&(collection_id.0, pk_key));
1082        for e in index_entries {
1083            self.indexes.apply(e)?;
1084        }
1085        Ok(())
1086    }
1087
1088    /// Return the latest stored row for `pk`, or `None` if no insert has been replayed for that key.
1089    ///
1090    /// `pk` must match the declared primary field’s [`crate::schema::Type`].
1091    pub fn get(
1092        &self,
1093        collection_id: CollectionId,
1094        pk: &ScalarValue,
1095    ) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
1096        let col = self
1097            .catalog_for_read()
1098            .get(collection_id)
1099            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1100                id: collection_id.0,
1101            }))?;
1102        let pk_name =
1103            col.primary_field
1104                .as_deref()
1105                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
1106                    collection_id: collection_id.0,
1107                }))?;
1108        let pk_ty = col
1109            .fields
1110            .iter()
1111            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
1112            .map(|f| &f.ty)
1113            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1114                name: pk_name.to_string(),
1115            }))?;
1116        if !pk.ty_matches(pk_ty) {
1117            return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
1118                collection_id: collection_id.0,
1119            }));
1120        }
1121        let key = (collection_id.0, pk.canonical_key_bytes());
1122        Ok(self.latest_for_read().get(&key).cloned())
1123    }
1124
1125    /// Write a durable checkpoint segment and publish it via the superblock.
1126    ///
1127    /// The checkpoint stores the logical state (catalog + latest rows + index state) so open can
1128    /// avoid scanning/replaying the full log. Works with any [`Store`] (file-backed [`FileStore`] or
1129    /// [`VecStore`] snapshots).
1130    pub fn checkpoint(&mut self) -> Result<(), DbError> {
1131        #[cfg(feature = "tracing")]
1132        let _span = tracing::info_span!("database_checkpoint").entered();
1133        if self.txn_staging.is_some() {
1134            return Err(DbError::Transaction(TransactionError::NestedTransaction));
1135        }
1136
1137        write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
1138
1139        let mut cp = checkpoint::checkpoint_from_state(
1140            self.catalog_for_read(),
1141            self.latest_for_read(),
1142            self.indexes_for_read(),
1143        )?;
1144
1145        let file_len = self.store.len()?;
1146        let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
1147        let checkpoint_offset = writer.offset();
1148
1149        let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
1150        let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
1151        cp.replay_from_offset = replay_from;
1152        let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
1153
1154        let hdr = SegmentHeader {
1155            segment_type: SegmentType::Checkpoint,
1156            payload_len: 0,
1157            payload_crc32c: 0,
1158        };
1159        writer.append(hdr, &payload)?;
1160
1161        publish::append_manifest_and_publish_with_checkpoint(
1162            &mut self.store,
1163            self.segment_start,
1164            Some((checkpoint_offset, payload.len() as u32)),
1165        )?;
1166        self.store.sync()?;
1167        #[cfg(feature = "tracing")]
1168        tracing::info!(
1169            checkpoint_offset,
1170            replay_from,
1171            payload_bytes = payload.len(),
1172            "database_checkpoint_ok"
1173        );
1174        Ok(())
1175    }
1176
1177    /// Test hook: mutate the planned row once on the replace path immediately before Opcode re-encoding.
1178    #[cfg(test)]
1179    #[doc(hidden)]
1180    pub(crate) fn test_arm_replace_encode_poison_once(
1181        &mut self,
1182        poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
1183    ) {
1184        self.test_poison_planned_replace_row = Some(poison);
1185    }
1186
1187    #[cfg(test)]
1188    #[doc(hidden)]
1189    pub(crate) fn test_arm_delete_encode_poison_once(
1190        &mut self,
1191        poison: fn(ScalarValue) -> ScalarValue,
1192    ) {
1193        self.test_poison_delete_encode_scalar = Some(poison);
1194    }
1195
1196    #[cfg(test)]
1197    #[doc(hidden)]
1198    pub(crate) fn test_catalog_mut(&mut self) -> &mut Catalog {
1199        &mut self.catalog
1200    }
1201
1202    /// Test helper: overwrite one cell in [`Self::latest`] without validation.
1203    #[cfg(test)]
1204    #[doc(hidden)]
1205    pub(crate) fn test_write_latest_cell_unchecked(
1206        &mut self,
1207        collection_id: CollectionId,
1208        pk: &ScalarValue,
1209        field: &str,
1210        value: RowValue,
1211    ) {
1212        let pk_key = pk.canonical_key_bytes();
1213        let row = self
1214            .latest
1215            .get_mut(&(collection_id.0, pk_key))
1216            .expect("test_write_latest_cell_unchecked: unknown row key");
1217        row.insert(field.to_string(), value);
1218    }
1219}
1220
1221impl Database<FileStore> {
1222    /// Rewrite the database into a compacted single-file image at `dest_path`.
1223    ///
1224    /// The destination file is truncated/overwritten if it exists.
1225    pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1226        self.compact_to_with_fsops(&StdFsOps, dest_path)
1227    }
1228
1229    pub(crate) fn compact_to_with_fsops(
1230        &self,
1231        fs: &dyn FsOps,
1232        dest_path: impl AsRef<Path>,
1233    ) -> Result<(), DbError> {
1234        #[cfg(feature = "tracing")]
1235        let _span = tracing::info_span!(
1236            "database_compact_to",
1237            dest = %dest_path.as_ref().display()
1238        )
1239        .entered();
1240        let bytes = self.compact_snapshot_bytes()?;
1241        let path = dest_path.as_ref();
1242        let file = fs
1243            .open_read_write_create_truncate(path)
1244            .map_err(DbError::Io)?;
1245        let mut store = FileStore::new(file);
1246        store.write_all_at(0, &bytes)?;
1247        store.truncate(bytes.len() as u64)?;
1248        store.sync()?;
1249        #[cfg(feature = "tracing")]
1250        tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
1251        Ok(())
1252    }
1253    pub fn compact_in_place(&mut self) -> Result<(), DbError> {
1254        self.compact_in_place_with_fsops(&StdFsOps)
1255    }
1256
1257    pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
1258        #[cfg(feature = "tracing")]
1259        let _span = tracing::info_span!("database_compact_in_place").entered();
1260        // Crash-safety: write a full new image to a sidecar file, fsync it, then atomically
1261        // replace the live path via rename (using a backup on platforms where rename does not
1262        // overwrite an existing destination).
1263        let bytes = self.compact_snapshot_bytes()?;
1264        let live_path = self.path.clone();
1265        let parent = live_path
1266            .parent()
1267            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1268
1269        // Pick unique temp + backup names in the same directory (so rename stays atomic on POSIX).
1270        let pid = std::process::id();
1271        let nanos = std::time::SystemTime::now()
1272            .duration_since(std::time::UNIX_EPOCH)
1273            .map(|d| d.as_nanos())
1274            .unwrap_or(0);
1275        let base = live_path
1276            .file_name()
1277            .and_then(|s| s.to_str())
1278            .unwrap_or("db.modelvault");
1279        let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
1280        let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
1281
1282        // 1) Write the compacted image to tmp and fsync it.
1283        {
1284            let file = fs
1285                .open_read_write_create_new(&tmp_path)
1286                .map_err(DbError::Io)?;
1287            let mut store = FileStore::new(file);
1288            store.write_all_at(0, &bytes)?;
1289            store.truncate(bytes.len() as u64)?;
1290            store.sync()?;
1291        }
1292
1293        // 2) Replace the live file path with the tmp image, preserving a backup until success.
1294        //
1295        // We do not rely on "rename over existing" being supported across platforms. Instead:
1296        // - move live → bak
1297        // - move tmp → live
1298        // - fsync directory (best-effort)
1299        // - remove bak
1300        //
1301        // If tmp → live fails, attempt to restore bak → live.
1302        let _ = fs.remove_file(&bak_path);
1303        fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
1304        let replace_res = fs.rename(&tmp_path, &live_path);
1305        if let Err(e) = replace_res {
1306            // Best-effort restore: move backup back into place.
1307            let _ = fs.rename(&bak_path, &live_path);
1308            // Clean up tmp if it still exists.
1309            let _ = fs.remove_file(&tmp_path);
1310            return Err(DbError::Io(e));
1311        }
1312
1313        // Best-effort directory sync: helps make the rename durable on POSIX.
1314        #[cfg(unix)]
1315        {
1316            // Best-effort: on many Unix platforms, opening a directory and syncing it will persist
1317            // the rename in the directory entry. If this fails, the data file itself is still
1318            // fsync'd and the operation remains logically correct; only rename durability is weaker.
1319            if let Ok(dir_f) = fs.open_dir(parent) {
1320                let _ = dir_f.sync_all();
1321            }
1322        }
1323
1324        let _ = fs.remove_file(&bak_path);
1325
1326        // 3) Refresh in-memory state by reopening (bypass writer registry; re-register after replace).
1327        self.writer_registry = None;
1328        let reopened = match (|| {
1329            let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
1330            Self::open_with_store(live_path.clone(), store, OpenOptions::default())
1331        })() {
1332            Ok(db) => db,
1333            Err(e) => {
1334                let _ = fs.rename(&bak_path, &live_path);
1335                self.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1336                    live_path.clone(),
1337                )?);
1338                return Err(e);
1339            }
1340        };
1341        let mut reopened = reopened;
1342        reopened.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1343            live_path.clone(),
1344        )?);
1345        *self = reopened;
1346        #[cfg(feature = "tracing")]
1347        tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
1348        Ok(())
1349    }
1350
1351    /// Create a consistent backup copy of this on-disk database.
1352    ///
1353    /// This writes a checkpoint (for fast reopen and a stable state marker) and then copies the
1354    /// underlying file bytes to `dest_path`.
1355    pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1356        self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
1357    }
1358
1359    pub(crate) fn export_snapshot_to_path_with_fsops(
1360        &mut self,
1361        fs: &dyn FsOps,
1362        dest_path: impl AsRef<Path>,
1363    ) -> Result<(), DbError> {
1364        self.checkpoint()?;
1365        let dest_path = dest_path.as_ref();
1366        fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
1367        // Strengthen durability of the copied snapshot: fsync the destination and best-effort
1368        // fsync its parent directory so the directory entry is persisted.
1369        if let Ok(f) = fs.open_read(dest_path) {
1370            let _ = f.sync_all();
1371        }
1372        #[cfg(unix)]
1373        best_effort_fsync_parent_dir(fs, dest_path);
1374        Ok(())
1375    }
1376
1377    /// Restore a snapshot file into `dest_path` by atomically replacing the destination.
1378    ///
1379    /// This is a file operation helper intended for operational tooling.
1380    pub fn restore_snapshot_to_path(
1381        snapshot_path: impl AsRef<Path>,
1382        dest_path: impl AsRef<Path>,
1383    ) -> Result<(), DbError> {
1384        Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
1385    }
1386
1387    pub(crate) fn restore_snapshot_to_path_with_fsops(
1388        fs: &dyn FsOps,
1389        snapshot_path: impl AsRef<Path>,
1390        dest_path: impl AsRef<Path>,
1391    ) -> Result<(), DbError> {
1392        let snapshot_path = snapshot_path.as_ref();
1393        let dest_path = dest_path.as_ref();
1394        let parent = dest_path
1395            .parent()
1396            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1397
1398        let pid = std::process::id();
1399        let nanos = std::time::SystemTime::now()
1400            .duration_since(std::time::UNIX_EPOCH)
1401            .map(|d| d.as_nanos())
1402            .unwrap_or(0);
1403        let base = dest_path
1404            .file_name()
1405            .and_then(|s| s.to_str())
1406            .unwrap_or("db.modelvault");
1407        let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
1408        let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
1409
1410        // Copy snapshot bytes into a temp file and fsync it.
1411        fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
1412        if let Ok(f) = fs.open_read(&tmp_path) {
1413            let _ = f.sync_all();
1414        }
1415
1416        // Replace destination with backup/restore semantics.
1417        if dest_path.exists() {
1418            let _ = fs.remove_file(&bak_path);
1419            fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
1420        }
1421        let replace_res = fs.rename(&tmp_path, dest_path);
1422        if let Err(e) = replace_res {
1423            // Best-effort restore original.
1424            if bak_path.exists() {
1425                let _ = fs.rename(&bak_path, dest_path);
1426            }
1427            let _ = fs.remove_file(&tmp_path);
1428            return Err(DbError::Io(e));
1429        }
1430
1431        #[cfg(unix)]
1432        {
1433            if let Ok(dir_f) = fs.open_dir(parent) {
1434                let _ = dir_f.sync_all();
1435            }
1436        }
1437        let _ = fs.remove_file(&bak_path);
1438        Ok(())
1439    }
1440}
1441
1442pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
1443    db: &'a Database<S>,
1444    collection_id: CollectionId,
1445    _marker: PhantomData<T>,
1446}
1447
1448impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
1449    pub fn where_eq(
1450        &self,
1451        path: crate::schema::FieldPath,
1452        value: ScalarValue,
1453    ) -> QueryBuilder<'a, S, T> {
1454        QueryBuilder {
1455            db: self.db,
1456            collection_id: self.collection_id,
1457            predicate: Some(crate::query::Predicate::Eq { path, value }),
1458            limit: None,
1459            _marker: PhantomData,
1460        }
1461    }
1462
1463    pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1464        let q = crate::query::Query {
1465            collection: self.collection_id,
1466            predicate: None,
1467            limit: None,
1468            order_by: None,
1469        };
1470        let rows = self.db.query(&q)?;
1471        Ok(rows.into_iter().map(project_row::<T>).collect())
1472    }
1473}
1474
1475pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
1476    db: &'a Database<S>,
1477    collection_id: CollectionId,
1478    predicate: Option<crate::query::Predicate>,
1479    limit: Option<usize>,
1480    _marker: PhantomData<T>,
1481}
1482
1483impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
1484    pub fn limit(mut self, n: usize) -> Self {
1485        self.limit = Some(n);
1486        self
1487    }
1488
1489    pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1490        let q = crate::query::Query {
1491            collection: self.collection_id,
1492            predicate: self.predicate,
1493            limit: self.limit,
1494            order_by: None,
1495        };
1496        let rows = self.db.query(&q)?;
1497        Ok(rows.into_iter().map(project_row::<T>).collect())
1498    }
1499
1500    pub fn explain(self) -> Result<String, DbError> {
1501        let q = crate::query::Query {
1502            collection: self.collection_id,
1503            predicate: self.predicate,
1504            limit: self.limit,
1505            order_by: None,
1506        };
1507        self.db.explain_query(&q)
1508    }
1509}
1510
1511fn validate_subset_model<T: crate::schema::DbModel>(
1512    col: &crate::catalog::CollectionInfo,
1513) -> Result<(), DbError> {
1514    crate::schema_compat::validate_model_fields_against_catalog(
1515        col,
1516        T::primary_field(),
1517        &T::fields(),
1518        &T::indexes(),
1519    )
1520}
1521
1522/// Build a row map containing only the listed fields (same rules as subset-model projection).
1523pub fn row_subset_by_field_defs(
1524    row: &BTreeMap<String, RowValue>,
1525    wanted: &[FieldDef],
1526) -> BTreeMap<String, RowValue> {
1527    let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
1528    for f in wanted {
1529        let segs = &f.path.0;
1530        if segs.is_empty() {
1531            continue;
1532        }
1533        let Some(leaf) = row_value_at_path_segments(row, segs) else {
1534            continue;
1535        };
1536        let root = segs[0].to_string();
1537        if segs.len() == 1 {
1538            out.insert(root, leaf);
1539        } else {
1540            let nested = row_value_nested_object_path(&segs[1..], leaf);
1541            match out.get_mut(&root) {
1542                Some(existing) => merge_row_value_trees(existing, nested),
1543                None => {
1544                    out.insert(root, nested);
1545                }
1546            }
1547        }
1548    }
1549    out
1550}
1551
1552fn row_value_at_path_segments(
1553    row: &BTreeMap<String, RowValue>,
1554    path: &[std::borrow::Cow<'static, str>],
1555) -> Option<RowValue> {
1556    if path.is_empty() {
1557        return None;
1558    }
1559    let mut cur = row.get(path[0].as_ref())?;
1560    for seg in path.iter().skip(1) {
1561        cur = match cur {
1562            RowValue::Object(m) => m.get(seg.as_ref())?,
1563            RowValue::None => return None,
1564            _ => return None,
1565        };
1566    }
1567    Some(cur.clone())
1568}
1569
1570/// Build `Object({ seg[0]: Object({ seg[1]: ... leaf }) })` for non-empty `seg`.
1571fn row_value_nested_object_path(
1572    segments: &[std::borrow::Cow<'static, str>],
1573    leaf: RowValue,
1574) -> RowValue {
1575    debug_assert!(!segments.is_empty());
1576    if segments.len() == 1 {
1577        let mut m = BTreeMap::new();
1578        m.insert(segments[0].to_string(), leaf);
1579        RowValue::Object(m)
1580    } else {
1581        let mut m = BTreeMap::new();
1582        m.insert(
1583            segments[0].to_string(),
1584            row_value_nested_object_path(&segments[1..], leaf),
1585        );
1586        RowValue::Object(m)
1587    }
1588}
1589
1590fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
1591    match (&mut *into, from) {
1592        (RowValue::Object(m1), RowValue::Object(m2)) => {
1593            for (k, v2) in m2 {
1594                match m1.entry(k) {
1595                    std::collections::btree_map::Entry::Vacant(e) => {
1596                        e.insert(v2);
1597                    }
1598                    std::collections::btree_map::Entry::Occupied(mut e) => {
1599                        merge_row_value_trees(e.get_mut(), v2);
1600                    }
1601                }
1602            }
1603        }
1604        (slot, from) => *slot = from,
1605    }
1606}
1607
1608fn project_row<T: crate::schema::DbModel>(
1609    row: BTreeMap<String, RowValue>,
1610) -> BTreeMap<String, RowValue> {
1611    row_subset_by_field_defs(&row, &T::fields())
1612}
1613
1614pub(crate) fn scalar_at_path(
1615    row: &BTreeMap<String, RowValue>,
1616    path: &crate::schema::FieldPath,
1617) -> Option<ScalarValue> {
1618    let mut cur: Option<&RowValue> = None;
1619    for (i, seg) in path.0.iter().enumerate() {
1620        let key = seg.as_ref();
1621        cur = match (i, cur) {
1622            (0, _) => row.get(key),
1623            (_, Some(RowValue::Object(map))) => map.get(key),
1624            (_, Some(RowValue::None)) => return None,
1625            _ => return None,
1626        };
1627    }
1628    cur.and_then(|v| v.as_scalar())
1629}
1630
1631impl Database<FileStore> {
1632    /// Open an existing file or create a new database at `path`.
1633    ///
1634    /// Creates parent directories as needed via the OS; the file is opened read/write.
1635    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
1636        Self::open_with_options(path, crate::config::OpenOptions::default())
1637    }
1638
1639    /// Open an existing file read-only (does not create it).
1640    pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
1641        Self::open_with_options(
1642            path,
1643            crate::config::OpenOptions {
1644                recovery: crate::config::RecoveryMode::Strict,
1645                mode: OpenMode::ReadOnly,
1646            },
1647        )
1648    }
1649
1650    /// Open with recovery and other options (see [`crate::config::OpenOptions`]).
1651    pub fn open_with_options(
1652        path: impl AsRef<Path>,
1653        opts: crate::config::OpenOptions,
1654    ) -> Result<Self, DbError> {
1655        let path = path.as_ref().to_path_buf();
1656        let store = FileStore::open_locked(&path, opts.mode)?;
1657        let mut db = Self::open_with_store(path.clone(), store, opts)?;
1658        if opts.mode == OpenMode::ReadWrite {
1659            db.writer_registry = Some(writer_registry::WriterRegistryGuard::new(path)?);
1660        }
1661        Ok(db)
1662    }
1663}
1664
1665impl Database<VecStore> {
1666    /// New empty in-memory database (same on-disk layout as a new file image in a [`VecStore`]).
1667    pub fn open_in_memory() -> Result<Self, DbError> {
1668        Self::open_in_memory_with_options(crate::config::OpenOptions::default())
1669    }
1670
1671    /// In-memory open with [`crate::config::OpenOptions`].
1672    pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
1673        Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
1674    }
1675
1676    /// Deserialize a full database image from bytes (e.g. from [`into_snapshot_bytes`](Self::into_snapshot_bytes)).
1677    pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
1678        Self::open_with_store(
1679            PathBuf::from(":memory:"),
1680            VecStore::from_vec(bytes),
1681            crate::config::OpenOptions::default(),
1682        )
1683    }
1684
1685    /// Consume `self` and return the owned byte buffer backing the store.
1686    pub fn into_snapshot_bytes(self) -> Vec<u8> {
1687        self.store.into_inner()
1688    }
1689
1690    /// Clone of the full serialized database image (alias of the buffer returned by [`into_snapshot_bytes`](Self::into_snapshot_bytes)).
1691    pub fn snapshot_bytes(&self) -> Vec<u8> {
1692        self.store.as_slice().to_vec()
1693    }
1694
1695    /// Write the full in-memory database image to `dest_path`.
1696    pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1697        Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
1698    }
1699
1700    pub(crate) fn export_snapshot_to_path_with_fsops(
1701        fs: &dyn FsOps,
1702        dest_path: impl AsRef<Path>,
1703        bytes: &[u8],
1704    ) -> Result<(), DbError> {
1705        fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
1706        Ok(())
1707    }
1708
1709    /// Open an in-memory database from a snapshot file.
1710    pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
1711        let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
1712        Self::from_snapshot_bytes(bytes)
1713    }
1714}
1715
1716#[cfg(test)]
1717mod scalar_at_path_tests {
1718    include!(concat!(
1719        env!("CARGO_MANIFEST_DIR"),
1720        "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
1721    ));
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726    include!(concat!(
1727        env!("CARGO_MANIFEST_DIR"),
1728        "/tests/unit/src_db_mod_tests.rs"
1729    ));
1730}