Skip to main content

modelvault_core/db/
mod.rs

1//! Database handle and orchestration.
2//!
3//! [`Database`] is implemented using internal modules `open` (bootstrap), `replay` (catalog and
4//! rows from segments), `write` (append segments and publish), and `helpers` (name rules).
5
6mod fs_ops;
7mod helpers;
8mod open;
9mod recover;
10mod replay;
11pub(crate) mod row_materialize;
12mod writer_registry;
13pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
14mod row_merge;
15pub(crate) mod row_paths;
16pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
17mod write;
18
19use std::collections::{BTreeMap, HashMap};
20use std::marker::PhantomData;
21use std::path::{Path, PathBuf};
22
23use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
24use crate::config::{OpenMode, OpenOptions};
25use crate::error::{DbError, FormatError, SchemaError, TransactionError};
26use crate::index::IndexState;
27use crate::index::{encode_index_payload, IndexEntry, IndexOp};
28use crate::record::{
29    encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
30    encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
31    OP_REPLACE,
32};
33use crate::schema::{classify_schema_update, SchemaChange};
34use crate::schema::{CollectionId, FieldDef, SchemaVersion};
35use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
36use crate::segments::writer::SegmentWriter;
37use crate::storage::{FileStore, Store, VecStore};
38use crate::validation;
39use crate::{checkpoint, publish};
40use crate::{MigrationPlan, MigrationStep};
41
42use self::fs_ops::{FsOps, StdFsOps};
43
44/// Best-effort `fsync` on `dest_path`'s parent directory (Unix only).
45#[cfg(unix)]
46fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
47    let Some(parent) = dest_path.parent() else {
48        return;
49    };
50    let Ok(dir_f) = fs.open_dir(parent) else {
51        return;
52    };
53    let _ = dir_f.sync_all();
54}
55
56pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
57
58type PlannedInsert = (
59    Vec<u8>,
60    (Vec<u8>, BTreeMap<String, RowValue>),
61    Vec<IndexEntry>,
62    ScalarValue,
63);
64
65fn plan_insert_row(
66    catalog: &Catalog,
67    collection_id: CollectionId,
68    mut row: BTreeMap<String, RowValue>,
69) -> Result<PlannedInsert, DbError> {
70    let col =
71        catalog
72            .get(collection_id)
73            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
74                id: collection_id.0,
75            }))?;
76    let pk_name =
77        col.primary_field
78            .as_deref()
79            .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
80                collection_id: collection_id.0,
81            }))?;
82    let pk_def = col
83        .fields
84        .iter()
85        .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
86        .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
87            name: pk_name.to_string(),
88        }))?;
89    let pk_ty = &pk_def.ty;
90    validation::ensure_pk_type_primitive(pk_ty)?;
91    let mut pk_path = vec![pk_name.to_string()];
92    let pk_cell = row
93        .get(pk_name)
94        .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
95            name: pk_name.to_string(),
96        }))?;
97    validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
98    // Validate unknown fields: for nested schema paths we validate by traversing row objects.
99    // For legacy single-segment schemas, keep the existing top-level validation.
100    let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
101    if !has_multi_segment_schema {
102        validation::validate_top_level_row(&col.fields, pk_name, &row)?;
103    } else {
104        validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
105    }
106
107    // `pk_cell` is already present (validated above), so remove must succeed.
108    let pk_val = row.remove(pk_name).unwrap();
109    let pk_scalar = pk_val.clone().into_scalar()?;
110
111    // Build non-PK values in schema order.
112    // - legacy v2: single-segment top-level field defs
113    // - v3: full FieldPath for each non-PK def (multi-segment allowed)
114    let non_pk_defs = if has_multi_segment_schema {
115        col.fields
116            .iter()
117            .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
118            .collect::<Vec<_>>()
119    } else {
120        non_pk_defs_in_order(&col.fields, pk_name)
121    };
122    let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
123
124    let payload = if has_multi_segment_schema {
125        encode_record_payload_v3(
126            collection_id.0,
127            col.current_version.0,
128            &pk_scalar,
129            pk_ty,
130            &non_pk,
131        )?
132    } else {
133        encode_record_payload_v2(
134            collection_id.0,
135            col.current_version.0,
136            &pk_scalar,
137            pk_ty,
138            &non_pk,
139        )?
140    };
141
142    let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
143    full_map.insert(pk_name.to_string(), pk_val);
144    for (def, v) in &non_pk {
145        let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
146        if parts.len() == 1 {
147            full_map.insert(parts[0].clone(), v.clone());
148        } else {
149            debug_assert!(parts.len() >= 2);
150            row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
151        }
152    }
153    let mut index_entries: Vec<IndexEntry> = Vec::new();
154    for idx in &col.indexes {
155        let Some(v) = scalar_at_path(&full_map, &idx.path) else {
156            continue;
157        };
158        index_entries.push(IndexEntry {
159            collection_id: collection_id.0,
160            index_name: idx.name.clone(),
161            kind: idx.kind,
162            op: IndexOp::Insert,
163            index_key: v.canonical_key_bytes(),
164            pk_key: pk_scalar.canonical_key_bytes(),
165        });
166    }
167    let pk_key = pk_scalar.canonical_key_bytes();
168    Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
169}
170
171fn index_deletes_for_existing_row(
172    collection_id: CollectionId,
173    pk_scalar: &ScalarValue,
174    indexes: &[crate::schema::IndexDef],
175    existing_row: &BTreeMap<String, RowValue>,
176) -> Vec<IndexEntry> {
177    let mut out = Vec::new();
178    for idx in indexes {
179        let Some(v) = scalar_at_path(existing_row, &idx.path) else {
180            continue;
181        };
182        out.push(IndexEntry {
183            collection_id: collection_id.0,
184            index_name: idx.name.clone(),
185            kind: idx.kind,
186            op: IndexOp::Delete,
187            index_key: v.canonical_key_bytes(),
188            pk_key: pk_scalar.canonical_key_bytes(),
189        });
190    }
191    out
192}
193
194/// Staged writes while [`Database::transaction`] is executing.
195pub(crate) struct TxnStaging {
196    pub(crate) txn_id: u64,
197    pub(crate) shadow_catalog: Catalog,
198    pub(crate) shadow_latest: LatestMap,
199    pub(crate) shadow_indexes: IndexState,
200    pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
201}
202
203/// Opened ModelVault database: generic over a [`Store`] ([`FileStore`] on disk, [`VecStore`] in memory).
204pub struct Database<S: Store = FileStore> {
205    /// Path shown by [`Database::path`] (`":memory:"` for [`VecStore`]).
206    path: PathBuf,
207    store: S,
208    /// In-memory view of schema segments replayed from disk.
209    catalog: Catalog,
210    /// Byte offset where the append-only segment log begins (after header and superblocks).
211    segment_start: u64,
212    /// Format minor from the file header; may be lazily upgraded (`3` → `4` → `5`) on write.
213    format_minor: u16,
214    /// Latest row per `(collection_id, canonical primary-key bytes)`; last replayed insert wins.
215    latest: LatestMap,
216    /// Secondary indexes rebuilt from replayed `Index` segments.
217    indexes: IndexState,
218    /// Monotonic id for transaction marker segments (format minor 6+).
219    txn_seq: u64,
220    /// When set, [`insert`] / [`register_collection`] append to this batch instead of autocommit.
221    txn_staging: Option<TxnStaging>,
222    /// Present for writable on-disk databases (process-wide single-writer registry).
223    #[allow(dead_code)]
224    writer_registry: Option<writer_registry::WriterRegistryGuard>,
225    /// Covers replace-path record encoding error branches in tests (misaligned validated row maps).
226    #[cfg(test)]
227    #[doc(hidden)]
228    #[allow(clippy::type_complexity)]
229    pub(crate) test_poison_planned_replace_row:
230        Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
231    /// Covers delete Opcode payload encoding `?` by supplying a bogus scalar unrelated to validated `pk`.
232    #[cfg(test)]
233    #[doc(hidden)]
234    pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
235}
236
237impl<S: Store> Database<S> {
238    fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
239        let mut out = Database::<VecStore>::open_in_memory()?;
240
241        // Recreate catalog (stable ids if created in id order).
242        let mut cols = self.catalog_for_read().collections();
243        cols.sort_by_key(|c| c.id.0);
244        for c in &cols {
245            let pk =
246                c.primary_field
247                    .as_deref()
248                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
249                        collection_id: c.id.0,
250                    }))?;
251            let (new_id, _v1) = out.register_collection_with_indexes(
252                &c.name,
253                c.fields.clone(),
254                c.indexes.clone(),
255                pk,
256            )?;
257            // Bump schema version counter to match current_version (repeat identical schema).
258            for _ in 2..=c.current_version.0 {
259                let _ = out.register_schema_version_with_indexes_force(
260                    new_id,
261                    c.fields.clone(),
262                    c.indexes.clone(),
263                )?;
264            }
265        }
266
267        // Copy latest rows (in-memory snapshot semantics).
268        for ((cid, _), row) in self.latest_for_read().iter() {
269            let collection_id = CollectionId(*cid);
270            out.insert(collection_id, row.clone())?;
271        }
272
273        Ok(out.into_snapshot_bytes())
274    }
275
276    pub(crate) fn open_with_store(
277        path: PathBuf,
278        store: S,
279        opts: OpenOptions,
280    ) -> Result<Self, DbError> {
281        open::open_with_store(path, store, opts)
282    }
283
284    fn next_txn_id(&mut self) -> u64 {
285        self.txn_seq = self.txn_seq.saturating_add(1);
286        self.txn_seq
287    }
288
289    #[inline]
290    fn commit_write_batch(
291        &mut self,
292        txn_id: u64,
293        body: &[(crate::segments::header::SegmentType, &[u8])],
294    ) -> Result<(), DbError> {
295        write::commit_write_txn_v6(
296            &mut self.store,
297            self.segment_start,
298            &mut self.format_minor,
299            txn_id,
300            body,
301        )
302    }
303
304    #[inline]
305    fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
306        self.catalog.apply_record(wire)
307    }
308
309    /// Run `f` inside a multi-write transaction: durable segments are written on success.
310    ///
311    /// On error, staged work is discarded and nothing new is appended to the log.
312    pub fn transaction<R>(
313        &mut self,
314        f: impl FnOnce(&mut Self) -> Result<R, DbError>,
315    ) -> Result<R, DbError> {
316        self.begin_transaction()?;
317        match f(self) {
318            Ok(v) => match self.commit_transaction() {
319                Ok(()) => Ok(v),
320                Err(e) => {
321                    self.rollback_transaction();
322                    Err(e)
323                }
324            },
325            Err(e) => {
326                self.rollback_transaction();
327                Err(e)
328            }
329        }
330    }
331
332    /// Start a transaction (for bindings that cannot use the closure API). Pairs with
333    /// [`Self::commit_transaction`] or [`Self::rollback_transaction`].
334    pub fn begin_transaction(&mut self) -> Result<(), DbError> {
335        if self.txn_staging.is_some() {
336            return Err(DbError::Transaction(TransactionError::NestedTransaction));
337        }
338        let tid = self.next_txn_id();
339        self.txn_staging = Some(TxnStaging {
340            txn_id: tid,
341            shadow_catalog: self.catalog.clone(),
342            shadow_latest: self.latest.clone(),
343            shadow_indexes: self.indexes.clone(),
344            pending: Vec::new(),
345        });
346        Ok(())
347    }
348
349    /// Commit the active transaction started with [`Self::begin_transaction`].
350    pub fn commit_transaction(&mut self) -> Result<(), DbError> {
351        self.commit_txn_staging()
352    }
353
354    /// Discard the active transaction without writing to the log.
355    pub fn rollback_transaction(&mut self) {
356        self.txn_staging = None;
357    }
358
359    fn commit_txn_staging(&mut self) -> Result<(), DbError> {
360        let Some(st) = self.txn_staging.take() else {
361            return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
362        };
363        if st.pending.is_empty() {
364            self.catalog = st.shadow_catalog;
365            self.latest = st.shadow_latest;
366            self.indexes = st.shadow_indexes;
367            return Ok(());
368        }
369        let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
370            st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
371        self.commit_write_batch(st.txn_id, &batch)?;
372        self.catalog = st.shadow_catalog;
373        self.latest = st.shadow_latest;
374        self.indexes = st.shadow_indexes;
375        Ok(())
376    }
377
378    fn catalog_for_read(&self) -> &Catalog {
379        if let Some(ref st) = self.txn_staging {
380            &st.shadow_catalog
381        } else {
382            &self.catalog
383        }
384    }
385
386    fn indexes_for_read(&self) -> &IndexState {
387        if let Some(ref st) = self.txn_staging {
388            &st.shadow_indexes
389        } else {
390            &self.indexes
391        }
392    }
393
394    fn latest_for_read(&self) -> &LatestMap {
395        if let Some(ref st) = self.txn_staging {
396            &st.shadow_latest
397        } else {
398            &self.latest
399        }
400    }
401
402    /// Path passed to [`Database::open`](Database::<FileStore>::open), or `":memory:"` for [`VecStore`].
403    pub fn path(&self) -> &Path {
404        &self.path
405    }
406
407    /// Read-only view of the schema catalog built from `Schema` segments.
408    pub fn catalog(&self) -> &Catalog {
409        self.catalog_for_read()
410    }
411
412    /// All registered collection names in lexicographic order.
413    pub fn collection_names(&self) -> Vec<String> {
414        self.catalog_for_read().collection_names()
415    }
416
417    /// Read-only access to the in-memory secondary index state (rebuilt from `Index` segments).
418    pub fn index_state(&self) -> &IndexState {
419        self.indexes_for_read()
420    }
421
422    /// Execute a query against the current in-memory snapshot of the database.
423    pub fn query(
424        &self,
425        q: &crate::query::Query,
426    ) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
427        crate::query::execute_query(
428            self.catalog_for_read(),
429            self.indexes_for_read(),
430            self.latest_for_read(),
431            q,
432        )
433    }
434
435    /// Return a human-readable explanation of the chosen plan for `q`.
436    pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
437        crate::query::explain_query(self.catalog_for_read(), q)
438    }
439
440    /// Lazy iterator over query rows (same semantics as [`Self::query`]).
441    ///
442    /// See [`crate::query::QueryRowIter`] — this is the v0.7 pull-based execution boundary, not a
443    /// full operator graph.
444    pub fn query_iter(
445        &self,
446        q: &crate::query::Query,
447    ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
448        crate::query::execute_query_iter_with_spill_path(
449            self.catalog_for_read(),
450            self.indexes_for_read(),
451            self.latest_for_read(),
452            q,
453            Some(self.path.as_path()),
454        )
455    }
456
457    /// Register the collection schema defined by `T` (schema version 1).
458    pub fn register_model<T: crate::schema::DbModel>(
459        &mut self,
460    ) -> Result<(CollectionId, SchemaVersion), DbError> {
461        self.register_collection_with_indexes(
462            T::collection_name(),
463            T::fields(),
464            T::indexes(),
465            T::primary_field(),
466        )
467    }
468
469    /// Typed handle over a registered collection; `T` may be a *subset model*.
470    pub fn collection<'a, T: crate::schema::DbModel>(
471        &'a self,
472    ) -> Result<Collection<'a, S, T>, DbError> {
473        let cid = self.collection_id_named(T::collection_name())?;
474        let col = self
475            .catalog_for_read()
476            .get(cid)
477            .expect("collection id from name lookup must exist in catalog");
478        validate_subset_model::<T>(col)?;
479        Ok(Collection {
480            db: self,
481            collection_id: cid,
482            _marker: PhantomData,
483        })
484    }
485
486    /// Look up [`CollectionId`] by collection name (leading/trailing whitespace trimmed).
487    ///
488    /// Returns [`SchemaError::UnknownCollectionName`] when the name is not registered.
489    pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
490        self.catalog_for_read()
491            .lookup_name(name)
492            .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
493                name: name.trim().to_string(),
494            }))
495    }
496
497    /// Create a new collection at schema version `1`.
498    ///
499    /// `primary_field` must name a **single-segment** (top-level) field present in `fields`.
500    /// Appends a catalog segment and updates the in-memory catalog.
501    pub fn register_collection(
502        &mut self,
503        name: &str,
504        fields: Vec<FieldDef>,
505        primary_field: &str,
506    ) -> Result<(CollectionId, SchemaVersion), DbError> {
507        self.register_collection_with_indexes(name, fields, vec![], primary_field)
508    }
509
510    pub fn register_collection_with_indexes(
511        &mut self,
512        name: &str,
513        fields: Vec<FieldDef>,
514        indexes: Vec<crate::schema::IndexDef>,
515        primary_field: &str,
516    ) -> Result<(CollectionId, SchemaVersion), DbError> {
517        let name = helpers::normalize_collection_name(name)?;
518        let pk = primary_field.trim();
519        if pk.is_empty() {
520            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
521        }
522        if !Catalog::has_top_level_field(&fields, pk) {
523            return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
524                name: pk.to_string(),
525            }));
526        }
527        if let Some(st) = &mut self.txn_staging {
528            let id = st.shadow_catalog.next_collection_id().0;
529            let wire = CatalogRecordWire::CreateCollection {
530                collection_id: id,
531                name: name.clone(),
532                schema_version: 1,
533                fields,
534                indexes,
535                primary_field: Some(pk.to_string()),
536            };
537            let payload = encode_catalog_payload(&wire);
538            st.shadow_catalog.apply_record(wire)?;
539            st.pending
540                .push((crate::segments::header::SegmentType::Schema, payload));
541            return Ok((CollectionId(id), SchemaVersion(1)));
542        }
543        let id = self.catalog.next_collection_id().0;
544        let wire = CatalogRecordWire::CreateCollection {
545            collection_id: id,
546            name: name.clone(),
547            schema_version: 1,
548            fields,
549            indexes,
550            primary_field: Some(pk.to_string()),
551        };
552        let payload = encode_catalog_payload(&wire);
553        let tid = self.next_txn_id();
554        self.commit_write_batch(
555            tid,
556            &[(
557                crate::segments::header::SegmentType::Schema,
558                payload.as_slice(),
559            )],
560        )?;
561        self.apply_catalog_record(wire)?;
562        Ok((CollectionId(id), SchemaVersion(1)))
563    }
564
565    /// Bump the schema version for `id` to `current + 1` with a new field set.
566    ///
567    /// The primary-key field must remain present as a top-level field (see catalog rules).
568    pub fn register_schema_version(
569        &mut self,
570        id: CollectionId,
571        fields: Vec<FieldDef>,
572    ) -> Result<SchemaVersion, DbError> {
573        self.register_schema_version_with_indexes(id, fields, vec![])
574    }
575
576    pub fn register_schema_version_with_indexes(
577        &mut self,
578        id: CollectionId,
579        fields: Vec<FieldDef>,
580        indexes: Vec<crate::schema::IndexDef>,
581    ) -> Result<SchemaVersion, DbError> {
582        let current = self
583            .catalog_for_read()
584            .get(id)
585            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
586        // `classify_schema_update` only returns `Ok(...)` variants today; keep it infallible here.
587        match classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)? {
588            SchemaChange::Safe => {}
589            SchemaChange::NeedsMigration { reason, .. } => {
590                return Err(DbError::Schema(SchemaError::MigrationRequired {
591                    message: reason,
592                }));
593            }
594            SchemaChange::Breaking { reason } => {
595                return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
596                    message: reason,
597                }));
598            }
599        }
600        let next_v = current
601            .current_version
602            .0
603            .checked_add(1)
604            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
605        let wire = CatalogRecordWire::NewSchemaVersion {
606            collection_id: id.0,
607            schema_version: next_v,
608            fields,
609            indexes,
610        };
611        let payload = encode_catalog_payload(&wire);
612        if let Some(st) = &mut self.txn_staging {
613            st.shadow_catalog.apply_record(wire.clone())?;
614            st.pending
615                .push((crate::segments::header::SegmentType::Schema, payload));
616            return Ok(SchemaVersion(next_v));
617        }
618        let tid = self.next_txn_id();
619        self.commit_write_batch(
620            tid,
621            &[(
622                crate::segments::header::SegmentType::Schema,
623                payload.as_slice(),
624            )],
625        )?;
626        self.apply_catalog_record(wire)?;
627        Ok(SchemaVersion(next_v))
628    }
629
630    /// Plan a schema version bump and return the required migration steps, if any.
631    pub fn plan_schema_version_with_indexes(
632        &self,
633        id: CollectionId,
634        fields: Vec<FieldDef>,
635        indexes: Vec<crate::schema::IndexDef>,
636    ) -> Result<MigrationPlan, DbError> {
637        let current = self
638            .catalog_for_read()
639            .get(id)
640            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
641        // Same infallibility contract as `register_schema_version_with_indexes` above.
642        let change = classify_schema_update(&current.fields, &current.indexes, &fields, &indexes)?;
643        let mut steps = Vec::new();
644        match &change {
645            SchemaChange::Safe => {}
646            SchemaChange::Breaking { .. } => {}
647            SchemaChange::NeedsMigration {
648                reason,
649                backfill_top_level_field,
650                backfill_field_path,
651            } => {
652                if let Some(field) = backfill_top_level_field {
653                    steps.push(MigrationStep::BackfillTopLevelField {
654                        field: field.clone(),
655                    });
656                } else if let Some(path) = backfill_field_path {
657                    steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
658                } else if reason.contains("unique index") {
659                    steps.push(MigrationStep::RebuildIndexes);
660                }
661            }
662        }
663        Ok(MigrationPlan { change, steps })
664    }
665
666    /// Backfill a missing top-level field with a fixed value for all rows in a collection.
667    ///
668    /// This helper is intentionally simple so it can be bound to other languages.
669    pub fn backfill_top_level_field_with_value(
670        &mut self,
671        collection_id: CollectionId,
672        field: &str,
673        value: RowValue,
674    ) -> Result<(), DbError> {
675        let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
676        self.backfill_field_at_path_with_value(collection_id, &path, value)
677    }
678
679    /// Backfill a missing field (any segment path) with a fixed value for all rows.
680    pub fn backfill_field_at_path_with_value(
681        &mut self,
682        collection_id: CollectionId,
683        path: &crate::schema::FieldPath,
684        value: RowValue,
685    ) -> Result<(), DbError> {
686        let col = self
687            .catalog_for_read()
688            .get(collection_id)
689            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
690                id: collection_id.0,
691            }))?;
692        let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
693            DbError::Schema(SchemaError::RowUnknownField {
694                name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
695            })
696        })?;
697
698        let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
699        for ((cid, _), row) in self.latest_for_read().iter() {
700            if *cid != collection_id.0 {
701                continue;
702            }
703            rows.push(row.clone());
704        }
705
706        self.transaction(|db| {
707            for mut row in rows {
708                if row_value_at_path_segments(&row, &path.0).is_some() {
709                    continue;
710                }
711                crate::record::insert_value_at_path(&mut row, path, value.clone())?;
712                db.insert(collection_id, row)?;
713            }
714            Ok(())
715        })
716    }
717
718    /// Rebuild index entries for all rows in `collection_id` using the current schema’s index defs.
719    pub fn rebuild_indexes_for_collection(
720        &mut self,
721        collection_id: CollectionId,
722    ) -> Result<(), DbError> {
723        let col = self
724            .catalog_for_read()
725            .get(collection_id)
726            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
727                id: collection_id.0,
728            }))?;
729        let pk_name =
730            col.primary_field
731                .as_deref()
732                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
733                    collection_id: collection_id.0,
734                }))?;
735        let pk_def = col
736            .fields
737            .iter()
738            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
739            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
740                name: pk_name.to_string(),
741            }))?;
742
743        let mut entries: Vec<IndexEntry> = Vec::new();
744        for ((cid, _), row) in self.latest_for_read().iter() {
745            if *cid != collection_id.0 {
746                continue;
747            }
748            let Some(pk_cell) = row.get(pk_name) else {
749                continue;
750            };
751            let pk_scalar = pk_cell.clone().into_scalar()?;
752            if !pk_scalar.ty_matches(&pk_def.ty) {
753                continue;
754            }
755            for idx in &col.indexes {
756                let Some(v) = scalar_at_path(row, &idx.path) else {
757                    continue;
758                };
759                entries.push(IndexEntry {
760                    collection_id: collection_id.0,
761                    index_name: idx.name.clone(),
762                    kind: idx.kind,
763                    op: IndexOp::Insert,
764                    index_key: v.canonical_key_bytes(),
765                    pk_key: pk_scalar.canonical_key_bytes(),
766                });
767            }
768        }
769
770        self.transaction(|db| {
771            if entries.is_empty() {
772                return Ok(());
773            }
774            // Apply in-memory + persist as one index segment batch.
775            // `begin_transaction` always installs `txn_staging` before this closure runs.
776            let st = db
777                .txn_staging
778                .as_mut()
779                .expect("transaction staging must be active");
780            let b = encode_index_payload(&entries);
781            st.pending
782                .push((crate::segments::header::SegmentType::Index, b));
783            for e in entries {
784                st.shadow_indexes.apply(e)?;
785            }
786            Ok(())
787        })
788    }
789
790    /// Force-register a new schema version, bypassing compatibility checks.
791    ///
792    /// This is an escape hatch for advanced workflows where the caller performs an out-of-band
793    /// data rewrite (or accepts inconsistent index/query behavior until a rebuild).
794    pub fn register_schema_version_with_indexes_force(
795        &mut self,
796        id: CollectionId,
797        fields: Vec<FieldDef>,
798        indexes: Vec<crate::schema::IndexDef>,
799    ) -> Result<SchemaVersion, DbError> {
800        let current = self
801            .catalog_for_read()
802            .get(id)
803            .ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
804        let next_v = current
805            .current_version
806            .0
807            .checked_add(1)
808            .ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
809        let wire = CatalogRecordWire::NewSchemaVersion {
810            collection_id: id.0,
811            schema_version: next_v,
812            fields,
813            indexes,
814        };
815        let payload = encode_catalog_payload(&wire);
816        if let Some(st) = &mut self.txn_staging {
817            st.shadow_catalog.apply_record(wire.clone())?;
818            st.pending
819                .push((crate::segments::header::SegmentType::Schema, payload));
820            return Ok(SchemaVersion(next_v));
821        }
822        let tid = self.next_txn_id();
823        self.commit_write_batch(
824            tid,
825            &[(
826                crate::segments::header::SegmentType::Schema,
827                payload.as_slice(),
828            )],
829        )?;
830        self.apply_catalog_record(wire)?;
831        Ok(SchemaVersion(next_v))
832    }
833
834    /// Insert or replace the row for `collection_id` identified by its primary-key field.
835    ///
836    /// `row` maps **top-level** field names to [`RowValue`]. The primary key field must be present.
837    /// Only single-segment field paths are supported in 0.6.x.
838    pub fn insert(
839        &mut self,
840        collection_id: CollectionId,
841        row: BTreeMap<String, RowValue>,
842    ) -> Result<(), DbError> {
843        write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
844        let (mut payload, full, mut index_entries, pk_scalar) =
845            plan_insert_row(self.catalog_for_read(), collection_id, row)?;
846        #[cfg(test)]
847        let mut full = full;
848        let existing = self
849            .latest_for_read()
850            .get(&(collection_id.0, full.0.clone()))
851            .cloned();
852        if existing.is_some() {
853            #[cfg(test)]
854            if let Some(poison) = self.test_poison_planned_replace_row.take() {
855                poison(collection_id, &mut full.1);
856            }
857            // Re-encode with explicit replace opcode.
858            let col = self
859                .catalog_for_read()
860                .get(collection_id)
861                .ok_or(DbError::Schema(SchemaError::UnknownCollection {
862                    id: collection_id.0,
863                }))?;
864            let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
865            let pk_name =
866                col.primary_field
867                    .as_deref()
868                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
869                        collection_id: collection_id.0,
870                    }))?;
871            let pk_def = col
872                .fields
873                .iter()
874                .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
875                .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
876                    name: pk_name.to_string(),
877                }))?;
878
879            let non_pk_defs = if has_multi_segment_schema {
880                col.fields
881                    .iter()
882                    .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
883                    .collect::<Vec<_>>()
884            } else {
885                non_pk_defs_in_order(&col.fields, pk_name)
886            };
887            let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
888            for def in &non_pk_defs {
889                let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
890                non_pk.push(((*def).clone(), v));
891            }
892            payload = (if has_multi_segment_schema {
893                encode_record_payload_v3_op(
894                    collection_id.0,
895                    col.current_version.0,
896                    OP_REPLACE,
897                    &pk_scalar,
898                    &pk_def.ty,
899                    &non_pk,
900                )
901            } else {
902                encode_record_payload_v2_op(
903                    collection_id.0,
904                    col.current_version.0,
905                    OP_REPLACE,
906                    &pk_scalar,
907                    &pk_def.ty,
908                    &non_pk,
909                )
910            })?;
911            // Prepend index deletes for any existing row.
912            if let Some(ref old_row) = existing {
913                let mut deletes = index_deletes_for_existing_row(
914                    collection_id,
915                    &pk_scalar,
916                    &col.indexes,
917                    old_row,
918                );
919                deletes.append(&mut index_entries);
920                index_entries = deletes;
921            }
922        }
923        for e in &index_entries {
924            if e.kind != crate::schema::IndexKind::Unique {
925                continue;
926            }
927            let Some(existing) =
928                self.indexes_for_read()
929                    .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
930            else {
931                continue;
932            };
933            if e.op != IndexOp::Insert {
934                continue;
935            }
936            if existing == e.pk_key.as_slice() {
937                continue;
938            }
939            return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
940        }
941        if let Some(st) = &mut self.txn_staging {
942            if !index_entries.is_empty() {
943                let b = encode_index_payload(&index_entries);
944                st.pending
945                    .push((crate::segments::header::SegmentType::Index, b));
946            }
947            st.pending.push((
948                crate::segments::header::SegmentType::Record,
949                payload.clone(),
950            ));
951            st.shadow_latest
952                .insert((collection_id.0, full.0.clone()), full.1.clone());
953            for e in index_entries {
954                st.shadow_indexes.apply(e)?;
955            }
956            return Ok(());
957        }
958        let tid = self.next_txn_id();
959        let index_bytes = if index_entries.is_empty() {
960            None
961        } else {
962            Some(encode_index_payload(&index_entries))
963        };
964        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
965        if let Some(ref b) = index_bytes {
966            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
967        }
968        batch.push((
969            crate::segments::header::SegmentType::Record,
970            payload.as_slice(),
971        ));
972        self.commit_write_batch(tid, &batch)?;
973        self.latest.insert((collection_id.0, full.0), full.1);
974        for e in index_entries {
975            self.indexes.apply(e)?;
976        }
977        Ok(())
978    }
979
980    /// Delete the row for `collection_id` identified by its primary key.
981    pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
982        write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
983        let col = self
984            .catalog_for_read()
985            .get(collection_id)
986            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
987                id: collection_id.0,
988            }))?;
989        let pk_name =
990            col.primary_field
991                .as_deref()
992                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
993                    collection_id: collection_id.0,
994                }))?;
995        let pk_def = col
996            .fields
997            .iter()
998            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
999            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1000                name: pk_name.to_string(),
1001            }))?;
1002        if !pk.ty_matches(&pk_def.ty) {
1003            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
1004        }
1005        let pk_key = pk.canonical_key_bytes();
1006        let existing = self
1007            .latest_for_read()
1008            .get(&(collection_id.0, pk_key.clone()))
1009            .cloned();
1010        let Some(old_row) = existing else {
1011            return Ok(());
1012        };
1013        let indexes = col.indexes.clone();
1014        let schema_ver = col.current_version.0;
1015        let pk_ty = pk_def.ty.clone();
1016        let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
1017
1018        let mut index_entries =
1019            index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
1020        #[cfg(not(test))]
1021        let pk_for_record = pk.clone();
1022        #[cfg(test)]
1023        let pk_for_record = {
1024            let mut p = pk.clone();
1025            if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
1026                p = poison(p);
1027            }
1028            p
1029        };
1030        let record_payload = (if has_multi_segment_schema {
1031            encode_record_payload_v3_op(
1032                collection_id.0,
1033                schema_ver,
1034                OP_DELETE,
1035                &pk_for_record,
1036                &pk_ty,
1037                &[],
1038            )
1039        } else {
1040            encode_record_payload_v2_op(
1041                collection_id.0,
1042                schema_ver,
1043                OP_DELETE,
1044                &pk_for_record,
1045                &pk_ty,
1046                &[],
1047            )
1048        })?;
1049
1050        if let Some(st) = &mut self.txn_staging {
1051            if !index_entries.is_empty() {
1052                let b = encode_index_payload(&index_entries);
1053                st.pending
1054                    .push((crate::segments::header::SegmentType::Index, b));
1055            }
1056            st.pending.push((
1057                crate::segments::header::SegmentType::Record,
1058                record_payload.clone(),
1059            ));
1060            st.shadow_latest.remove(&(collection_id.0, pk_key));
1061            for e in index_entries.drain(..) {
1062                st.shadow_indexes.apply(e)?;
1063            }
1064            return Ok(());
1065        }
1066
1067        let tid = self.next_txn_id();
1068        let index_bytes = if index_entries.is_empty() {
1069            None
1070        } else {
1071            Some(encode_index_payload(&index_entries))
1072        };
1073        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
1074        if let Some(ref b) = index_bytes {
1075            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
1076        }
1077        batch.push((
1078            crate::segments::header::SegmentType::Record,
1079            record_payload.as_slice(),
1080        ));
1081        self.commit_write_batch(tid, &batch)?;
1082        self.latest.remove(&(collection_id.0, pk_key));
1083        for e in index_entries {
1084            self.indexes.apply(e)?;
1085        }
1086        Ok(())
1087    }
1088
1089    /// Return the latest stored row for `pk`, or `None` if no insert has been replayed for that key.
1090    ///
1091    /// `pk` must match the declared primary field’s [`crate::schema::Type`].
1092    pub fn get(
1093        &self,
1094        collection_id: CollectionId,
1095        pk: &ScalarValue,
1096    ) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
1097        let col = self
1098            .catalog_for_read()
1099            .get(collection_id)
1100            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1101                id: collection_id.0,
1102            }))?;
1103        let pk_name =
1104            col.primary_field
1105                .as_deref()
1106                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
1107                    collection_id: collection_id.0,
1108                }))?;
1109        let pk_ty = col
1110            .fields
1111            .iter()
1112            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
1113            .map(|f| &f.ty)
1114            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
1115                name: pk_name.to_string(),
1116            }))?;
1117        if !pk.ty_matches(pk_ty) {
1118            return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
1119                collection_id: collection_id.0,
1120            }));
1121        }
1122        let key = (collection_id.0, pk.canonical_key_bytes());
1123        Ok(self.latest_for_read().get(&key).cloned())
1124    }
1125
1126    /// Write a durable checkpoint segment and publish it via the superblock.
1127    ///
1128    /// The checkpoint stores the logical state (catalog + latest rows + index state) so open can
1129    /// avoid scanning/replaying the full log. Works with any [`Store`] (file-backed [`FileStore`] or
1130    /// [`VecStore`] snapshots).
1131    pub fn checkpoint(&mut self) -> Result<(), DbError> {
1132        #[cfg(feature = "tracing")]
1133        let _span = tracing::info_span!("database_checkpoint").entered();
1134        if self.txn_staging.is_some() {
1135            return Err(DbError::Transaction(TransactionError::NestedTransaction));
1136        }
1137
1138        write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
1139
1140        let mut cp = checkpoint::checkpoint_from_state(
1141            self.catalog_for_read(),
1142            self.latest_for_read(),
1143            self.indexes_for_read(),
1144        )?;
1145
1146        let file_len = self.store.len()?;
1147        let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
1148        let checkpoint_offset = writer.offset();
1149
1150        let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
1151        let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
1152        cp.replay_from_offset = replay_from;
1153        let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
1154
1155        let hdr = SegmentHeader {
1156            segment_type: SegmentType::Checkpoint,
1157            payload_len: 0,
1158            payload_crc32c: 0,
1159        };
1160        writer.append(hdr, &payload)?;
1161
1162        publish::append_manifest_and_publish_with_checkpoint(
1163            &mut self.store,
1164            self.segment_start,
1165            Some((checkpoint_offset, payload.len() as u32)),
1166        )?;
1167        self.store.sync()?;
1168        #[cfg(feature = "tracing")]
1169        tracing::info!(
1170            checkpoint_offset,
1171            replay_from,
1172            payload_bytes = payload.len(),
1173            "database_checkpoint_ok"
1174        );
1175        Ok(())
1176    }
1177
1178    /// Test hook: mutate the planned row once on the replace path immediately before Opcode re-encoding.
1179    #[cfg(test)]
1180    #[doc(hidden)]
1181    pub(crate) fn test_arm_replace_encode_poison_once(
1182        &mut self,
1183        poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
1184    ) {
1185        self.test_poison_planned_replace_row = Some(poison);
1186    }
1187
1188    #[cfg(test)]
1189    #[doc(hidden)]
1190    pub(crate) fn test_arm_delete_encode_poison_once(
1191        &mut self,
1192        poison: fn(ScalarValue) -> ScalarValue,
1193    ) {
1194        self.test_poison_delete_encode_scalar = Some(poison);
1195    }
1196
1197    /// Test helper: overwrite one cell in [`Self::latest`] without validation.
1198    #[cfg(test)]
1199    #[doc(hidden)]
1200    pub(crate) fn test_write_latest_cell_unchecked(
1201        &mut self,
1202        collection_id: CollectionId,
1203        pk: &ScalarValue,
1204        field: &str,
1205        value: RowValue,
1206    ) {
1207        let pk_key = pk.canonical_key_bytes();
1208        let row = self
1209            .latest
1210            .get_mut(&(collection_id.0, pk_key))
1211            .expect("test_write_latest_cell_unchecked: unknown row key");
1212        row.insert(field.to_string(), value);
1213    }
1214}
1215
1216impl Database<FileStore> {
1217    /// Rewrite the database into a compacted single-file image at `dest_path`.
1218    ///
1219    /// The destination file is truncated/overwritten if it exists.
1220    pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1221        self.compact_to_with_fsops(&StdFsOps, dest_path)
1222    }
1223
1224    pub(crate) fn compact_to_with_fsops(
1225        &self,
1226        fs: &dyn FsOps,
1227        dest_path: impl AsRef<Path>,
1228    ) -> Result<(), DbError> {
1229        #[cfg(feature = "tracing")]
1230        let _span = tracing::info_span!(
1231            "database_compact_to",
1232            dest = %dest_path.as_ref().display()
1233        )
1234        .entered();
1235        let bytes = self.compact_snapshot_bytes()?;
1236        let path = dest_path.as_ref();
1237        let file = fs
1238            .open_read_write_create_truncate(path)
1239            .map_err(DbError::Io)?;
1240        let mut store = FileStore::new(file);
1241        store.write_all_at(0, &bytes)?;
1242        store.truncate(bytes.len() as u64)?;
1243        store.sync()?;
1244        #[cfg(feature = "tracing")]
1245        tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
1246        Ok(())
1247    }
1248    pub fn compact_in_place(&mut self) -> Result<(), DbError> {
1249        self.compact_in_place_with_fsops(&StdFsOps)
1250    }
1251
1252    pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
1253        #[cfg(feature = "tracing")]
1254        let _span = tracing::info_span!("database_compact_in_place").entered();
1255        // Crash-safety: write a full new image to a sidecar file, fsync it, then atomically
1256        // replace the live path via rename (using a backup on platforms where rename does not
1257        // overwrite an existing destination).
1258        let bytes = self.compact_snapshot_bytes()?;
1259        let live_path = self.path.clone();
1260        let parent = live_path
1261            .parent()
1262            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1263
1264        // Pick unique temp + backup names in the same directory (so rename stays atomic on POSIX).
1265        let pid = std::process::id();
1266        let nanos = std::time::SystemTime::now()
1267            .duration_since(std::time::UNIX_EPOCH)
1268            .map(|d| d.as_nanos())
1269            .unwrap_or(0);
1270        let base = live_path
1271            .file_name()
1272            .and_then(|s| s.to_str())
1273            .unwrap_or("db.modelvault");
1274        let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
1275        let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
1276
1277        // 1) Write the compacted image to tmp and fsync it.
1278        {
1279            let file = fs
1280                .open_read_write_create_new(&tmp_path)
1281                .map_err(DbError::Io)?;
1282            let mut store = FileStore::new(file);
1283            store.write_all_at(0, &bytes)?;
1284            store.truncate(bytes.len() as u64)?;
1285            store.sync()?;
1286        }
1287
1288        // 2) Replace the live file path with the tmp image, preserving a backup until success.
1289        //
1290        // We do not rely on "rename over existing" being supported across platforms. Instead:
1291        // - move live → bak
1292        // - move tmp → live
1293        // - fsync directory (best-effort)
1294        // - remove bak
1295        //
1296        // If tmp → live fails, attempt to restore bak → live.
1297        let _ = fs.remove_file(&bak_path);
1298        fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
1299        let replace_res = fs.rename(&tmp_path, &live_path);
1300        if let Err(e) = replace_res {
1301            // Best-effort restore: move backup back into place.
1302            let _ = fs.rename(&bak_path, &live_path);
1303            // Clean up tmp if it still exists.
1304            let _ = fs.remove_file(&tmp_path);
1305            return Err(DbError::Io(e));
1306        }
1307
1308        // Best-effort directory sync: helps make the rename durable on POSIX.
1309        #[cfg(unix)]
1310        {
1311            // Best-effort: on many Unix platforms, opening a directory and syncing it will persist
1312            // the rename in the directory entry. If this fails, the data file itself is still
1313            // fsync'd and the operation remains logically correct; only rename durability is weaker.
1314            if let Ok(dir_f) = fs.open_dir(parent) {
1315                let _ = dir_f.sync_all();
1316            }
1317        }
1318
1319        let _ = fs.remove_file(&bak_path);
1320
1321        // 3) Refresh in-memory state by reopening (bypass writer registry; re-register after replace).
1322        self.writer_registry = None;
1323        let reopened = match (|| {
1324            let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
1325            Self::open_with_store(live_path.clone(), store, OpenOptions::default())
1326        })() {
1327            Ok(db) => db,
1328            Err(e) => {
1329                let _ = fs.rename(&bak_path, &live_path);
1330                self.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1331                    live_path.clone(),
1332                )?);
1333                return Err(e);
1334            }
1335        };
1336        let mut reopened = reopened;
1337        reopened.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
1338            live_path.clone(),
1339        )?);
1340        *self = reopened;
1341        #[cfg(feature = "tracing")]
1342        tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
1343        Ok(())
1344    }
1345
1346    /// Create a consistent backup copy of this on-disk database.
1347    ///
1348    /// This writes a checkpoint (for fast reopen and a stable state marker) and then copies the
1349    /// underlying file bytes to `dest_path`.
1350    pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1351        self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
1352    }
1353
1354    pub(crate) fn export_snapshot_to_path_with_fsops(
1355        &mut self,
1356        fs: &dyn FsOps,
1357        dest_path: impl AsRef<Path>,
1358    ) -> Result<(), DbError> {
1359        self.checkpoint()?;
1360        let dest_path = dest_path.as_ref();
1361        fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
1362        // Strengthen durability of the copied snapshot: fsync the destination and best-effort
1363        // fsync its parent directory so the directory entry is persisted.
1364        if let Ok(f) = fs.open_read(dest_path) {
1365            let _ = f.sync_all();
1366        }
1367        #[cfg(unix)]
1368        best_effort_fsync_parent_dir(fs, dest_path);
1369        Ok(())
1370    }
1371
1372    /// Restore a snapshot file into `dest_path` by atomically replacing the destination.
1373    ///
1374    /// This is a file operation helper intended for operational tooling.
1375    pub fn restore_snapshot_to_path(
1376        snapshot_path: impl AsRef<Path>,
1377        dest_path: impl AsRef<Path>,
1378    ) -> Result<(), DbError> {
1379        Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
1380    }
1381
1382    pub(crate) fn restore_snapshot_to_path_with_fsops(
1383        fs: &dyn FsOps,
1384        snapshot_path: impl AsRef<Path>,
1385        dest_path: impl AsRef<Path>,
1386    ) -> Result<(), DbError> {
1387        let snapshot_path = snapshot_path.as_ref();
1388        let dest_path = dest_path.as_ref();
1389        let parent = dest_path
1390            .parent()
1391            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
1392
1393        let pid = std::process::id();
1394        let nanos = std::time::SystemTime::now()
1395            .duration_since(std::time::UNIX_EPOCH)
1396            .map(|d| d.as_nanos())
1397            .unwrap_or(0);
1398        let base = dest_path
1399            .file_name()
1400            .and_then(|s| s.to_str())
1401            .unwrap_or("db.modelvault");
1402        let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
1403        let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
1404
1405        // Copy snapshot bytes into a temp file and fsync it.
1406        fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
1407        if let Ok(f) = fs.open_read(&tmp_path) {
1408            let _ = f.sync_all();
1409        }
1410
1411        // Replace destination with backup/restore semantics.
1412        if dest_path.exists() {
1413            let _ = fs.remove_file(&bak_path);
1414            fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
1415        }
1416        let replace_res = fs.rename(&tmp_path, dest_path);
1417        if let Err(e) = replace_res {
1418            // Best-effort restore original.
1419            if bak_path.exists() {
1420                let _ = fs.rename(&bak_path, dest_path);
1421            }
1422            let _ = fs.remove_file(&tmp_path);
1423            return Err(DbError::Io(e));
1424        }
1425
1426        #[cfg(unix)]
1427        {
1428            if let Ok(dir_f) = fs.open_dir(parent) {
1429                let _ = dir_f.sync_all();
1430            }
1431        }
1432        let _ = fs.remove_file(&bak_path);
1433        Ok(())
1434    }
1435}
1436
1437pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
1438    db: &'a Database<S>,
1439    collection_id: CollectionId,
1440    _marker: PhantomData<T>,
1441}
1442
1443impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
1444    pub fn where_eq(
1445        &self,
1446        path: crate::schema::FieldPath,
1447        value: ScalarValue,
1448    ) -> QueryBuilder<'a, S, T> {
1449        QueryBuilder {
1450            db: self.db,
1451            collection_id: self.collection_id,
1452            predicate: Some(crate::query::Predicate::Eq { path, value }),
1453            limit: None,
1454            _marker: PhantomData,
1455        }
1456    }
1457
1458    pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1459        let q = crate::query::Query {
1460            collection: self.collection_id,
1461            predicate: None,
1462            limit: None,
1463            order_by: None,
1464        };
1465        let rows = self.db.query(&q)?;
1466        Ok(rows.into_iter().map(project_row::<T>).collect())
1467    }
1468}
1469
1470pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
1471    db: &'a Database<S>,
1472    collection_id: CollectionId,
1473    predicate: Option<crate::query::Predicate>,
1474    limit: Option<usize>,
1475    _marker: PhantomData<T>,
1476}
1477
1478impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
1479    pub fn limit(mut self, n: usize) -> Self {
1480        self.limit = Some(n);
1481        self
1482    }
1483
1484    pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
1485        let q = crate::query::Query {
1486            collection: self.collection_id,
1487            predicate: self.predicate,
1488            limit: self.limit,
1489            order_by: None,
1490        };
1491        let rows = self.db.query(&q)?;
1492        Ok(rows.into_iter().map(project_row::<T>).collect())
1493    }
1494
1495    pub fn explain(self) -> Result<String, DbError> {
1496        let q = crate::query::Query {
1497            collection: self.collection_id,
1498            predicate: self.predicate,
1499            limit: self.limit,
1500            order_by: None,
1501        };
1502        self.db.explain_query(&q)
1503    }
1504}
1505
1506fn validate_subset_model<T: crate::schema::DbModel>(
1507    col: &crate::catalog::CollectionInfo,
1508) -> Result<(), DbError> {
1509    crate::schema_compat::validate_model_fields_against_catalog(
1510        col,
1511        T::primary_field(),
1512        &T::fields(),
1513        &T::indexes(),
1514    )
1515}
1516
1517/// Build a row map containing only the listed fields (same rules as subset-model projection).
1518pub fn row_subset_by_field_defs(
1519    row: &BTreeMap<String, RowValue>,
1520    wanted: &[FieldDef],
1521) -> BTreeMap<String, RowValue> {
1522    let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
1523    for f in wanted {
1524        let segs = &f.path.0;
1525        if segs.is_empty() {
1526            continue;
1527        }
1528        let Some(leaf) = row_value_at_path_segments(row, segs) else {
1529            continue;
1530        };
1531        let root = segs[0].to_string();
1532        if segs.len() == 1 {
1533            out.insert(root, leaf);
1534        } else {
1535            let nested = row_value_nested_object_path(&segs[1..], leaf);
1536            match out.get_mut(&root) {
1537                Some(existing) => merge_row_value_trees(existing, nested),
1538                None => {
1539                    out.insert(root, nested);
1540                }
1541            }
1542        }
1543    }
1544    out
1545}
1546
1547fn row_value_at_path_segments(
1548    row: &BTreeMap<String, RowValue>,
1549    path: &[std::borrow::Cow<'static, str>],
1550) -> Option<RowValue> {
1551    if path.is_empty() {
1552        return None;
1553    }
1554    let mut cur = row.get(path[0].as_ref())?;
1555    for seg in path.iter().skip(1) {
1556        cur = match cur {
1557            RowValue::Object(m) => m.get(seg.as_ref())?,
1558            RowValue::None => return None,
1559            _ => return None,
1560        };
1561    }
1562    Some(cur.clone())
1563}
1564
1565/// Build `Object({ seg[0]: Object({ seg[1]: ... leaf }) })` for non-empty `seg`.
1566fn row_value_nested_object_path(
1567    segments: &[std::borrow::Cow<'static, str>],
1568    leaf: RowValue,
1569) -> RowValue {
1570    debug_assert!(!segments.is_empty());
1571    if segments.len() == 1 {
1572        let mut m = BTreeMap::new();
1573        m.insert(segments[0].to_string(), leaf);
1574        RowValue::Object(m)
1575    } else {
1576        let mut m = BTreeMap::new();
1577        m.insert(
1578            segments[0].to_string(),
1579            row_value_nested_object_path(&segments[1..], leaf),
1580        );
1581        RowValue::Object(m)
1582    }
1583}
1584
1585fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
1586    match (&mut *into, from) {
1587        (RowValue::Object(m1), RowValue::Object(m2)) => {
1588            for (k, v2) in m2 {
1589                match m1.entry(k) {
1590                    std::collections::btree_map::Entry::Vacant(e) => {
1591                        e.insert(v2);
1592                    }
1593                    std::collections::btree_map::Entry::Occupied(mut e) => {
1594                        merge_row_value_trees(e.get_mut(), v2);
1595                    }
1596                }
1597            }
1598        }
1599        (slot, from) => *slot = from,
1600    }
1601}
1602
1603fn project_row<T: crate::schema::DbModel>(
1604    row: BTreeMap<String, RowValue>,
1605) -> BTreeMap<String, RowValue> {
1606    row_subset_by_field_defs(&row, &T::fields())
1607}
1608
1609pub(crate) fn scalar_at_path(
1610    row: &BTreeMap<String, RowValue>,
1611    path: &crate::schema::FieldPath,
1612) -> Option<ScalarValue> {
1613    let mut cur: Option<&RowValue> = None;
1614    for (i, seg) in path.0.iter().enumerate() {
1615        let key = seg.as_ref();
1616        cur = match (i, cur) {
1617            (0, _) => row.get(key),
1618            (_, Some(RowValue::Object(map))) => map.get(key),
1619            (_, Some(RowValue::None)) => return None,
1620            _ => return None,
1621        };
1622    }
1623    cur.and_then(|v| v.as_scalar())
1624}
1625
1626impl Database<FileStore> {
1627    /// Open an existing file or create a new database at `path`.
1628    ///
1629    /// Creates parent directories as needed via the OS; the file is opened read/write.
1630    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
1631        Self::open_with_options(path, crate::config::OpenOptions::default())
1632    }
1633
1634    /// Open an existing file read-only (does not create it).
1635    pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
1636        Self::open_with_options(
1637            path,
1638            crate::config::OpenOptions {
1639                recovery: crate::config::RecoveryMode::Strict,
1640                mode: OpenMode::ReadOnly,
1641            },
1642        )
1643    }
1644
1645    /// Open with recovery and other options (see [`crate::config::OpenOptions`]).
1646    pub fn open_with_options(
1647        path: impl AsRef<Path>,
1648        opts: crate::config::OpenOptions,
1649    ) -> Result<Self, DbError> {
1650        let path = path.as_ref().to_path_buf();
1651        let store = FileStore::open_locked(&path, opts.mode)?;
1652        let mut db = Self::open_with_store(path.clone(), store, opts)?;
1653        if opts.mode == OpenMode::ReadWrite {
1654            db.writer_registry = Some(writer_registry::WriterRegistryGuard::new(path)?);
1655        }
1656        Ok(db)
1657    }
1658}
1659
1660impl Database<VecStore> {
1661    /// New empty in-memory database (same on-disk layout as a new file image in a [`VecStore`]).
1662    pub fn open_in_memory() -> Result<Self, DbError> {
1663        Self::open_in_memory_with_options(crate::config::OpenOptions::default())
1664    }
1665
1666    /// In-memory open with [`crate::config::OpenOptions`].
1667    pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
1668        Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
1669    }
1670
1671    /// Deserialize a full database image from bytes (e.g. from [`into_snapshot_bytes`](Self::into_snapshot_bytes)).
1672    pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
1673        Self::open_with_store(
1674            PathBuf::from(":memory:"),
1675            VecStore::from_vec(bytes),
1676            crate::config::OpenOptions::default(),
1677        )
1678    }
1679
1680    /// Consume `self` and return the owned byte buffer backing the store.
1681    pub fn into_snapshot_bytes(self) -> Vec<u8> {
1682        self.store.into_inner()
1683    }
1684
1685    /// Clone of the full serialized database image (alias of the buffer returned by [`into_snapshot_bytes`](Self::into_snapshot_bytes)).
1686    pub fn snapshot_bytes(&self) -> Vec<u8> {
1687        self.store.as_slice().to_vec()
1688    }
1689
1690    /// Write the full in-memory database image to `dest_path`.
1691    pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
1692        Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
1693    }
1694
1695    pub(crate) fn export_snapshot_to_path_with_fsops(
1696        fs: &dyn FsOps,
1697        dest_path: impl AsRef<Path>,
1698        bytes: &[u8],
1699    ) -> Result<(), DbError> {
1700        fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
1701        Ok(())
1702    }
1703
1704    /// Open an in-memory database from a snapshot file.
1705    pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
1706        let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
1707        Self::from_snapshot_bytes(bytes)
1708    }
1709}
1710
1711#[cfg(test)]
1712mod scalar_at_path_tests {
1713    include!(concat!(
1714        env!("CARGO_MANIFEST_DIR"),
1715        "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
1716    ));
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721    include!(concat!(
1722        env!("CARGO_MANIFEST_DIR"),
1723        "/tests/unit/src_db_mod_tests.rs"
1724    ));
1725}