mod fs_ops;
mod helpers;
mod open;
mod recover;
mod replay;
pub(crate) mod row_materialize;
mod writer_registry;
pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
mod row_merge;
pub(crate) mod row_paths;
pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
mod write;
use std::collections::{BTreeMap, HashMap};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use crate::catalog::{encode_catalog_payload, Catalog, CatalogRecordWire};
use crate::config::{OpenMode, OpenOptions};
use crate::error::{DbError, FormatError, SchemaError, TransactionError};
use crate::index::IndexState;
use crate::index::{encode_index_payload, IndexEntry, IndexOp};
use crate::record::{
encode_record_payload_v2, encode_record_payload_v2_op, encode_record_payload_v3,
encode_record_payload_v3_op, non_pk_defs_in_order, RowValue, ScalarValue, OP_DELETE,
OP_REPLACE,
};
use crate::schema::{classify_schema_update, SchemaChange};
use crate::schema::{CollectionId, FieldDef, SchemaVersion};
use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
use crate::segments::writer::SegmentWriter;
use crate::storage::{FileStore, Store, VecStore};
use crate::validation;
use crate::{checkpoint, publish};
use crate::{MigrationPlan, MigrationStep};
use self::fs_ops::{FsOps, StdFsOps};
#[cfg(unix)]
fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
let Some(parent) = dest_path.parent() else {
return;
};
let Ok(dir_f) = fs.open_dir(parent) else {
return;
};
let _ = dir_f.sync_all();
}
pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
type PlannedInsert = (
Vec<u8>,
(Vec<u8>, BTreeMap<String, RowValue>),
Vec<IndexEntry>,
ScalarValue,
);
fn plan_insert_row(
catalog: &Catalog,
collection_id: CollectionId,
mut row: BTreeMap<String, RowValue>,
) -> Result<PlannedInsert, DbError> {
let col =
catalog
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_def = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
let pk_ty = &pk_def.ty;
validation::ensure_pk_type_primitive(pk_ty)?;
let mut pk_path = vec![pk_name.to_string()];
let pk_cell = row
.get(pk_name)
.ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
name: pk_name.to_string(),
}))?;
validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
if !has_multi_segment_schema {
validation::validate_top_level_row(&col.fields, pk_name, &row)?;
} else {
validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
}
let pk_val = row.remove(pk_name).unwrap();
let pk_scalar = pk_val.clone().into_scalar()?;
let non_pk_defs = if has_multi_segment_schema {
col.fields
.iter()
.filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
.collect::<Vec<_>>()
} else {
non_pk_defs_in_order(&col.fields, pk_name)
};
let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
let payload = if has_multi_segment_schema {
encode_record_payload_v3(
collection_id.0,
col.current_version.0,
&pk_scalar,
pk_ty,
&non_pk,
)?
} else {
encode_record_payload_v2(
collection_id.0,
col.current_version.0,
&pk_scalar,
pk_ty,
&non_pk,
)?
};
let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
full_map.insert(pk_name.to_string(), pk_val);
for (def, v) in &non_pk {
let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
if parts.len() == 1 {
full_map.insert(parts[0].clone(), v.clone());
} else {
debug_assert!(parts.len() >= 2);
row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
}
}
let mut index_entries: Vec<IndexEntry> = Vec::new();
for idx in &col.indexes {
let Some(v) = scalar_at_path(&full_map, &idx.path) else {
continue;
};
index_entries.push(IndexEntry {
collection_id: collection_id.0,
index_name: idx.name.clone(),
kind: idx.kind,
op: IndexOp::Insert,
index_key: v.canonical_key_bytes(),
pk_key: pk_scalar.canonical_key_bytes(),
});
}
let pk_key = pk_scalar.canonical_key_bytes();
Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
}
fn index_deletes_for_existing_row(
collection_id: CollectionId,
pk_scalar: &ScalarValue,
indexes: &[crate::schema::IndexDef],
existing_row: &BTreeMap<String, RowValue>,
) -> Vec<IndexEntry> {
let mut out = Vec::new();
for idx in indexes {
let Some(v) = scalar_at_path(existing_row, &idx.path) else {
continue;
};
out.push(IndexEntry {
collection_id: collection_id.0,
index_name: idx.name.clone(),
kind: idx.kind,
op: IndexOp::Delete,
index_key: v.canonical_key_bytes(),
pk_key: pk_scalar.canonical_key_bytes(),
});
}
out
}
pub(crate) struct TxnStaging {
pub(crate) txn_id: u64,
pub(crate) shadow_catalog: Catalog,
pub(crate) shadow_latest: LatestMap,
pub(crate) shadow_indexes: IndexState,
pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
}
pub struct Database<S: Store = FileStore> {
path: PathBuf,
store: S,
catalog: Catalog,
segment_start: u64,
format_minor: u16,
latest: LatestMap,
indexes: IndexState,
txn_seq: u64,
txn_staging: Option<TxnStaging>,
#[allow(dead_code)]
writer_registry: Option<writer_registry::WriterRegistryGuard>,
#[cfg(test)]
#[doc(hidden)]
#[allow(clippy::type_complexity)]
pub(crate) test_poison_planned_replace_row:
Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
#[cfg(test)]
#[doc(hidden)]
pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
}
impl<S: Store> Database<S> {
fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
let mut out = Database::<VecStore>::open_in_memory()?;
let mut cols = self.catalog_for_read().collections();
cols.sort_by_key(|c| c.id.0);
for c in &cols {
let pk =
c.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: c.id.0,
}))?;
let (new_id, _v1) = out.register_collection_with_indexes(
&c.name,
c.fields.clone(),
c.indexes.clone(),
pk,
)?;
for _ in 2..=c.current_version.0 {
let _ = out.register_schema_version_with_indexes_force(
new_id,
c.fields.clone(),
c.indexes.clone(),
)?;
}
}
for ((cid, _), row) in self.latest_for_read().iter() {
let collection_id = CollectionId(*cid);
out.insert(collection_id, row.clone())?;
}
Ok(out.into_snapshot_bytes())
}
pub(crate) fn open_with_store(
path: PathBuf,
store: S,
opts: OpenOptions,
) -> Result<Self, DbError> {
open::open_with_store(path, store, opts)
}
fn next_txn_id(&mut self) -> u64 {
self.txn_seq = self.txn_seq.saturating_add(1);
self.txn_seq
}
#[inline]
fn commit_write_batch(
&mut self,
txn_id: u64,
body: &[(crate::segments::header::SegmentType, &[u8])],
) -> Result<(), DbError> {
write::commit_write_txn_v6(
&mut self.store,
self.segment_start,
&mut self.format_minor,
txn_id,
body,
)
}
#[inline]
fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
self.catalog.apply_record(wire)
}
pub fn transaction<R>(
&mut self,
f: impl FnOnce(&mut Self) -> Result<R, DbError>,
) -> Result<R, DbError> {
self.begin_transaction()?;
match f(self) {
Ok(v) => match self.commit_transaction() {
Ok(()) => Ok(v),
Err(e) => {
self.rollback_transaction();
Err(e)
}
},
Err(e) => {
self.rollback_transaction();
Err(e)
}
}
}
pub fn begin_transaction(&mut self) -> Result<(), DbError> {
if self.txn_staging.is_some() {
return Err(DbError::Transaction(TransactionError::NestedTransaction));
}
let tid = self.next_txn_id();
self.txn_staging = Some(TxnStaging {
txn_id: tid,
shadow_catalog: self.catalog.clone(),
shadow_latest: self.latest.clone(),
shadow_indexes: self.indexes.clone(),
pending: Vec::new(),
});
Ok(())
}
pub fn commit_transaction(&mut self) -> Result<(), DbError> {
self.commit_txn_staging()
}
pub fn rollback_transaction(&mut self) {
self.txn_staging = None;
}
fn commit_txn_staging(&mut self) -> Result<(), DbError> {
let Some(st) = self.txn_staging.take() else {
return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
};
if st.pending.is_empty() {
self.catalog = st.shadow_catalog;
self.latest = st.shadow_latest;
self.indexes = st.shadow_indexes;
return Ok(());
}
let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
self.commit_write_batch(st.txn_id, &batch)?;
self.catalog = st.shadow_catalog;
self.latest = st.shadow_latest;
self.indexes = st.shadow_indexes;
Ok(())
}
fn catalog_for_read(&self) -> &Catalog {
if let Some(ref st) = self.txn_staging {
&st.shadow_catalog
} else {
&self.catalog
}
}
fn indexes_for_read(&self) -> &IndexState {
if let Some(ref st) = self.txn_staging {
&st.shadow_indexes
} else {
&self.indexes
}
}
fn latest_for_read(&self) -> &LatestMap {
if let Some(ref st) = self.txn_staging {
&st.shadow_latest
} else {
&self.latest
}
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn catalog(&self) -> &Catalog {
self.catalog_for_read()
}
pub fn collection_names(&self) -> Vec<String> {
self.catalog_for_read().collection_names()
}
pub fn index_state(&self) -> &IndexState {
self.indexes_for_read()
}
pub fn query(
&self,
q: &crate::query::Query,
) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
crate::query::execute_query(
self.catalog_for_read(),
self.indexes_for_read(),
self.latest_for_read(),
q,
)
}
pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
crate::query::explain_query(self.catalog_for_read(), q)
}
pub fn query_iter(
&self,
q: &crate::query::Query,
) -> Result<crate::query::QueryRowIter<'_>, DbError> {
crate::query::execute_query_iter_with_spill_path(
self.catalog_for_read(),
self.indexes_for_read(),
self.latest_for_read(),
q,
Some(self.path.as_path()),
)
}
pub fn register_model<T: crate::schema::DbModel>(
&mut self,
) -> Result<(CollectionId, SchemaVersion), DbError> {
self.register_collection_with_indexes(
T::collection_name(),
T::fields(),
T::indexes(),
T::primary_field(),
)
}
pub fn collection<'a, T: crate::schema::DbModel>(
&'a self,
) -> Result<Collection<'a, S, T>, DbError> {
let cid = self.collection_id_named(T::collection_name())?;
let col = self
.catalog_for_read()
.get(cid)
.expect("collection id from name lookup must exist in catalog");
validate_subset_model::<T>(col)?;
Ok(Collection {
db: self,
collection_id: cid,
_marker: PhantomData,
})
}
pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
self.catalog_for_read()
.lookup_name(name)
.ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
name: name.trim().to_string(),
}))
}
pub fn register_collection(
&mut self,
name: &str,
fields: Vec<FieldDef>,
primary_field: &str,
) -> Result<(CollectionId, SchemaVersion), DbError> {
self.register_collection_with_indexes(name, fields, vec![], primary_field)
}
pub fn register_collection_with_indexes(
&mut self,
name: &str,
fields: Vec<FieldDef>,
indexes: Vec<crate::schema::IndexDef>,
primary_field: &str,
) -> Result<(CollectionId, SchemaVersion), DbError> {
let name = helpers::normalize_collection_name(name)?;
let pk = primary_field.trim();
if pk.is_empty() {
return Err(DbError::Schema(SchemaError::InvalidCollectionName));
}
if !Catalog::has_top_level_field(&fields, pk) {
return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk.to_string(),
}));
}
if let Some(st) = &mut self.txn_staging {
let id = st.shadow_catalog.next_collection_id().0;
let wire = CatalogRecordWire::CreateCollection {
collection_id: id,
name: name.clone(),
schema_version: 1,
fields,
indexes,
primary_field: Some(pk.to_string()),
};
let payload = encode_catalog_payload(&wire);
st.shadow_catalog.apply_record(wire)?;
st.pending
.push((crate::segments::header::SegmentType::Schema, payload));
return Ok((CollectionId(id), SchemaVersion(1)));
}
let id = self.catalog.next_collection_id().0;
let wire = CatalogRecordWire::CreateCollection {
collection_id: id,
name: name.clone(),
schema_version: 1,
fields,
indexes,
primary_field: Some(pk.to_string()),
};
let payload = encode_catalog_payload(&wire);
let tid = self.next_txn_id();
self.commit_write_batch(
tid,
&[(
crate::segments::header::SegmentType::Schema,
payload.as_slice(),
)],
)?;
self.apply_catalog_record(wire)?;
Ok((CollectionId(id), SchemaVersion(1)))
}
pub fn register_schema_version(
&mut self,
id: CollectionId,
fields: Vec<FieldDef>,
) -> Result<SchemaVersion, DbError> {
self.register_schema_version_with_indexes(id, fields, vec![])
}
pub fn register_schema_version_with_indexes(
&mut self,
id: CollectionId,
fields: Vec<FieldDef>,
indexes: Vec<crate::schema::IndexDef>,
) -> Result<SchemaVersion, DbError> {
let current = self
.catalog_for_read()
.get(id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
match classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)? {
SchemaChange::Safe => {}
SchemaChange::NeedsMigration { reason, .. } => {
return Err(DbError::Schema(SchemaError::MigrationRequired {
message: reason,
}));
}
SchemaChange::Breaking { reason } => {
return Err(DbError::Schema(SchemaError::IncompatibleSchemaChange {
message: reason,
}));
}
}
let next_v = current
.current_version
.0
.checked_add(1)
.ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
let wire = CatalogRecordWire::NewSchemaVersion {
collection_id: id.0,
schema_version: next_v,
fields,
indexes,
};
let payload = encode_catalog_payload(&wire);
if let Some(st) = &mut self.txn_staging {
st.shadow_catalog.apply_record(wire.clone())?;
st.pending
.push((crate::segments::header::SegmentType::Schema, payload));
return Ok(SchemaVersion(next_v));
}
let tid = self.next_txn_id();
self.commit_write_batch(
tid,
&[(
crate::segments::header::SegmentType::Schema,
payload.as_slice(),
)],
)?;
self.apply_catalog_record(wire)?;
Ok(SchemaVersion(next_v))
}
pub fn plan_schema_version_with_indexes(
&self,
id: CollectionId,
fields: Vec<FieldDef>,
indexes: Vec<crate::schema::IndexDef>,
) -> Result<MigrationPlan, DbError> {
let current = self
.catalog_for_read()
.get(id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
let change = classify_schema_update(¤t.fields, ¤t.indexes, &fields, &indexes)?;
let mut steps = Vec::new();
match &change {
SchemaChange::Safe => {}
SchemaChange::Breaking { .. } => {}
SchemaChange::NeedsMigration {
reason,
backfill_top_level_field,
backfill_field_path,
} => {
if let Some(field) = backfill_top_level_field {
steps.push(MigrationStep::BackfillTopLevelField {
field: field.clone(),
});
} else if let Some(path) = backfill_field_path {
steps.push(MigrationStep::BackfillFieldAtPath { path: path.clone() });
} else if reason.contains("unique index") {
steps.push(MigrationStep::RebuildIndexes);
}
}
}
Ok(MigrationPlan { change, steps })
}
pub fn backfill_top_level_field_with_value(
&mut self,
collection_id: CollectionId,
field: &str,
value: RowValue,
) -> Result<(), DbError> {
let path = crate::schema::FieldPath(vec![std::borrow::Cow::Owned(field.to_string())]);
self.backfill_field_at_path_with_value(collection_id, &path, value)
}
pub fn backfill_field_at_path_with_value(
&mut self,
collection_id: CollectionId,
path: &crate::schema::FieldPath,
value: RowValue,
) -> Result<(), DbError> {
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let _field_def = col.fields.iter().find(|f| f.path == *path).ok_or_else(|| {
DbError::Schema(SchemaError::RowUnknownField {
name: path.0.last().map(|s| s.to_string()).unwrap_or_default(),
})
})?;
let mut rows: Vec<BTreeMap<String, RowValue>> = Vec::new();
for ((cid, _), row) in self.latest_for_read().iter() {
if *cid != collection_id.0 {
continue;
}
rows.push(row.clone());
}
self.transaction(|db| {
for mut row in rows {
if row_value_at_path_segments(&row, &path.0).is_some() {
continue;
}
crate::record::insert_value_at_path(&mut row, path, value.clone())?;
db.insert(collection_id, row)?;
}
Ok(())
})
}
pub fn rebuild_indexes_for_collection(
&mut self,
collection_id: CollectionId,
) -> Result<(), DbError> {
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_def = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
let mut entries: Vec<IndexEntry> = Vec::new();
for ((cid, _), row) in self.latest_for_read().iter() {
if *cid != collection_id.0 {
continue;
}
let Some(pk_cell) = row.get(pk_name) else {
continue;
};
let pk_scalar = pk_cell.clone().into_scalar()?;
if !pk_scalar.ty_matches(&pk_def.ty) {
continue;
}
for idx in &col.indexes {
let Some(v) = scalar_at_path(row, &idx.path) else {
continue;
};
entries.push(IndexEntry {
collection_id: collection_id.0,
index_name: idx.name.clone(),
kind: idx.kind,
op: IndexOp::Insert,
index_key: v.canonical_key_bytes(),
pk_key: pk_scalar.canonical_key_bytes(),
});
}
}
self.transaction(|db| {
if entries.is_empty() {
return Ok(());
}
let st = db
.txn_staging
.as_mut()
.expect("transaction staging must be active");
let b = encode_index_payload(&entries);
st.pending
.push((crate::segments::header::SegmentType::Index, b));
for e in entries {
st.shadow_indexes.apply(e)?;
}
Ok(())
})
}
pub fn register_schema_version_with_indexes_force(
&mut self,
id: CollectionId,
fields: Vec<FieldDef>,
indexes: Vec<crate::schema::IndexDef>,
) -> Result<SchemaVersion, DbError> {
let current = self
.catalog_for_read()
.get(id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection { id: id.0 }))?;
let next_v = current
.current_version
.0
.checked_add(1)
.ok_or(DbError::Schema(SchemaError::SchemaVersionExhausted))?;
let wire = CatalogRecordWire::NewSchemaVersion {
collection_id: id.0,
schema_version: next_v,
fields,
indexes,
};
let payload = encode_catalog_payload(&wire);
if let Some(st) = &mut self.txn_staging {
st.shadow_catalog.apply_record(wire.clone())?;
st.pending
.push((crate::segments::header::SegmentType::Schema, payload));
return Ok(SchemaVersion(next_v));
}
let tid = self.next_txn_id();
self.commit_write_batch(
tid,
&[(
crate::segments::header::SegmentType::Schema,
payload.as_slice(),
)],
)?;
self.apply_catalog_record(wire)?;
Ok(SchemaVersion(next_v))
}
pub fn insert(
&mut self,
collection_id: CollectionId,
row: BTreeMap<String, RowValue>,
) -> Result<(), DbError> {
write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
let (mut payload, full, mut index_entries, pk_scalar) =
plan_insert_row(self.catalog_for_read(), collection_id, row)?;
#[cfg(test)]
let mut full = full;
let existing = self
.latest_for_read()
.get(&(collection_id.0, full.0.clone()))
.cloned();
if existing.is_some() {
#[cfg(test)]
if let Some(poison) = self.test_poison_planned_replace_row.take() {
poison(collection_id, &mut full.1);
}
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_def = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
let non_pk_defs = if has_multi_segment_schema {
col.fields
.iter()
.filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
.collect::<Vec<_>>()
} else {
non_pk_defs_in_order(&col.fields, pk_name)
};
let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
for def in &non_pk_defs {
let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
non_pk.push(((*def).clone(), v));
}
payload = (if has_multi_segment_schema {
encode_record_payload_v3_op(
collection_id.0,
col.current_version.0,
OP_REPLACE,
&pk_scalar,
&pk_def.ty,
&non_pk,
)
} else {
encode_record_payload_v2_op(
collection_id.0,
col.current_version.0,
OP_REPLACE,
&pk_scalar,
&pk_def.ty,
&non_pk,
)
})?;
if let Some(ref old_row) = existing {
let mut deletes = index_deletes_for_existing_row(
collection_id,
&pk_scalar,
&col.indexes,
old_row,
);
deletes.append(&mut index_entries);
index_entries = deletes;
}
}
for e in &index_entries {
if e.kind != crate::schema::IndexKind::Unique {
continue;
}
let Some(existing) =
self.indexes_for_read()
.unique_lookup(e.collection_id, &e.index_name, &e.index_key)
else {
continue;
};
if e.op != IndexOp::Insert {
continue;
}
if existing == e.pk_key.as_slice() {
continue;
}
return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
}
if let Some(st) = &mut self.txn_staging {
if !index_entries.is_empty() {
let b = encode_index_payload(&index_entries);
st.pending
.push((crate::segments::header::SegmentType::Index, b));
}
st.pending.push((
crate::segments::header::SegmentType::Record,
payload.clone(),
));
st.shadow_latest
.insert((collection_id.0, full.0.clone()), full.1.clone());
for e in index_entries {
st.shadow_indexes.apply(e)?;
}
return Ok(());
}
let tid = self.next_txn_id();
let index_bytes = if index_entries.is_empty() {
None
} else {
Some(encode_index_payload(&index_entries))
};
let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
if let Some(ref b) = index_bytes {
batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
}
batch.push((
crate::segments::header::SegmentType::Record,
payload.as_slice(),
));
self.commit_write_batch(tid, &batch)?;
self.latest.insert((collection_id.0, full.0), full.1);
for e in index_entries {
self.indexes.apply(e)?;
}
Ok(())
}
pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_def = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
if !pk.ty_matches(&pk_def.ty) {
return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
}
let pk_key = pk.canonical_key_bytes();
let existing = self
.latest_for_read()
.get(&(collection_id.0, pk_key.clone()))
.cloned();
let Some(old_row) = existing else {
return Ok(());
};
let indexes = col.indexes.clone();
let schema_ver = col.current_version.0;
let pk_ty = pk_def.ty.clone();
let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
let mut index_entries =
index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
#[cfg(not(test))]
let pk_for_record = pk.clone();
#[cfg(test)]
let pk_for_record = {
let mut p = pk.clone();
if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
p = poison(p);
}
p
};
let record_payload = (if has_multi_segment_schema {
encode_record_payload_v3_op(
collection_id.0,
schema_ver,
OP_DELETE,
&pk_for_record,
&pk_ty,
&[],
)
} else {
encode_record_payload_v2_op(
collection_id.0,
schema_ver,
OP_DELETE,
&pk_for_record,
&pk_ty,
&[],
)
})?;
if let Some(st) = &mut self.txn_staging {
if !index_entries.is_empty() {
let b = encode_index_payload(&index_entries);
st.pending
.push((crate::segments::header::SegmentType::Index, b));
}
st.pending.push((
crate::segments::header::SegmentType::Record,
record_payload.clone(),
));
st.shadow_latest.remove(&(collection_id.0, pk_key));
for e in index_entries.drain(..) {
st.shadow_indexes.apply(e)?;
}
return Ok(());
}
let tid = self.next_txn_id();
let index_bytes = if index_entries.is_empty() {
None
} else {
Some(encode_index_payload(&index_entries))
};
let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
if let Some(ref b) = index_bytes {
batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
}
batch.push((
crate::segments::header::SegmentType::Record,
record_payload.as_slice(),
));
self.commit_write_batch(tid, &batch)?;
self.latest.remove(&(collection_id.0, pk_key));
for e in index_entries {
self.indexes.apply(e)?;
}
Ok(())
}
pub fn get(
&self,
collection_id: CollectionId,
pk: &ScalarValue,
) -> Result<Option<BTreeMap<String, RowValue>>, DbError> {
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_ty = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.map(|f| &f.ty)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
if !pk.ty_matches(pk_ty) {
return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
collection_id: collection_id.0,
}));
}
let key = (collection_id.0, pk.canonical_key_bytes());
Ok(self.latest_for_read().get(&key).cloned())
}
pub fn checkpoint(&mut self) -> Result<(), DbError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("database_checkpoint").entered();
if self.txn_staging.is_some() {
return Err(DbError::Transaction(TransactionError::NestedTransaction));
}
write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
let mut cp = checkpoint::checkpoint_from_state(
self.catalog_for_read(),
self.latest_for_read(),
self.indexes_for_read(),
)?;
let file_len = self.store.len()?;
let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
let checkpoint_offset = writer.offset();
let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
cp.replay_from_offset = replay_from;
let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
let hdr = SegmentHeader {
segment_type: SegmentType::Checkpoint,
payload_len: 0,
payload_crc32c: 0,
};
writer.append(hdr, &payload)?;
publish::append_manifest_and_publish_with_checkpoint(
&mut self.store,
self.segment_start,
Some((checkpoint_offset, payload.len() as u32)),
)?;
self.store.sync()?;
#[cfg(feature = "tracing")]
tracing::info!(
checkpoint_offset,
replay_from,
payload_bytes = payload.len(),
"database_checkpoint_ok"
);
Ok(())
}
#[cfg(test)]
#[doc(hidden)]
pub(crate) fn test_arm_replace_encode_poison_once(
&mut self,
poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
) {
self.test_poison_planned_replace_row = Some(poison);
}
#[cfg(test)]
#[doc(hidden)]
pub(crate) fn test_arm_delete_encode_poison_once(
&mut self,
poison: fn(ScalarValue) -> ScalarValue,
) {
self.test_poison_delete_encode_scalar = Some(poison);
}
#[cfg(test)]
#[doc(hidden)]
pub(crate) fn test_write_latest_cell_unchecked(
&mut self,
collection_id: CollectionId,
pk: &ScalarValue,
field: &str,
value: RowValue,
) {
let pk_key = pk.canonical_key_bytes();
let row = self
.latest
.get_mut(&(collection_id.0, pk_key))
.expect("test_write_latest_cell_unchecked: unknown row key");
row.insert(field.to_string(), value);
}
}
impl Database<FileStore> {
pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
self.compact_to_with_fsops(&StdFsOps, dest_path)
}
pub(crate) fn compact_to_with_fsops(
&self,
fs: &dyn FsOps,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!(
"database_compact_to",
dest = %dest_path.as_ref().display()
)
.entered();
let bytes = self.compact_snapshot_bytes()?;
let path = dest_path.as_ref();
let file = fs
.open_read_write_create_truncate(path)
.map_err(DbError::Io)?;
let mut store = FileStore::new(file);
store.write_all_at(0, &bytes)?;
store.truncate(bytes.len() as u64)?;
store.sync()?;
#[cfg(feature = "tracing")]
tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
Ok(())
}
pub fn compact_in_place(&mut self) -> Result<(), DbError> {
self.compact_in_place_with_fsops(&StdFsOps)
}
pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("database_compact_in_place").entered();
let bytes = self.compact_snapshot_bytes()?;
let live_path = self.path.clone();
let parent = live_path
.parent()
.ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let base = live_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("db.modelvault");
let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
{
let file = fs
.open_read_write_create_new(&tmp_path)
.map_err(DbError::Io)?;
let mut store = FileStore::new(file);
store.write_all_at(0, &bytes)?;
store.truncate(bytes.len() as u64)?;
store.sync()?;
}
let _ = fs.remove_file(&bak_path);
fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
let replace_res = fs.rename(&tmp_path, &live_path);
if let Err(e) = replace_res {
let _ = fs.rename(&bak_path, &live_path);
let _ = fs.remove_file(&tmp_path);
return Err(DbError::Io(e));
}
#[cfg(unix)]
{
if let Ok(dir_f) = fs.open_dir(parent) {
let _ = dir_f.sync_all();
}
}
let _ = fs.remove_file(&bak_path);
self.writer_registry = None;
let reopened = match (|| {
let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
Self::open_with_store(live_path.clone(), store, OpenOptions::default())
})() {
Ok(db) => db,
Err(e) => {
let _ = fs.rename(&bak_path, &live_path);
self.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
live_path.clone(),
)?);
return Err(e);
}
};
let mut reopened = reopened;
reopened.writer_registry = Some(writer_registry::WriterRegistryGuard::new(
live_path.clone(),
)?);
*self = reopened;
#[cfg(feature = "tracing")]
tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
Ok(())
}
pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
}
pub(crate) fn export_snapshot_to_path_with_fsops(
&mut self,
fs: &dyn FsOps,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
self.checkpoint()?;
let dest_path = dest_path.as_ref();
fs.copy(&self.path, dest_path).map_err(DbError::Io)?;
if let Ok(f) = fs.open_read(dest_path) {
let _ = f.sync_all();
}
#[cfg(unix)]
best_effort_fsync_parent_dir(fs, dest_path);
Ok(())
}
pub fn restore_snapshot_to_path(
snapshot_path: impl AsRef<Path>,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
}
pub(crate) fn restore_snapshot_to_path_with_fsops(
fs: &dyn FsOps,
snapshot_path: impl AsRef<Path>,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
let snapshot_path = snapshot_path.as_ref();
let dest_path = dest_path.as_ref();
let parent = dest_path
.parent()
.ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let base = dest_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("db.modelvault");
let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
if let Ok(f) = fs.open_read(&tmp_path) {
let _ = f.sync_all();
}
if dest_path.exists() {
let _ = fs.remove_file(&bak_path);
fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
}
let replace_res = fs.rename(&tmp_path, dest_path);
if let Err(e) = replace_res {
if bak_path.exists() {
let _ = fs.rename(&bak_path, dest_path);
}
let _ = fs.remove_file(&tmp_path);
return Err(DbError::Io(e));
}
#[cfg(unix)]
{
if let Ok(dir_f) = fs.open_dir(parent) {
let _ = dir_f.sync_all();
}
}
let _ = fs.remove_file(&bak_path);
Ok(())
}
}
pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
db: &'a Database<S>,
collection_id: CollectionId,
_marker: PhantomData<T>,
}
impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
pub fn where_eq(
&self,
path: crate::schema::FieldPath,
value: ScalarValue,
) -> QueryBuilder<'a, S, T> {
QueryBuilder {
db: self.db,
collection_id: self.collection_id,
predicate: Some(crate::query::Predicate::Eq { path, value }),
limit: None,
_marker: PhantomData,
}
}
pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
let q = crate::query::Query {
collection: self.collection_id,
predicate: None,
limit: None,
order_by: None,
};
let rows = self.db.query(&q)?;
Ok(rows.into_iter().map(project_row::<T>).collect())
}
}
pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
db: &'a Database<S>,
collection_id: CollectionId,
predicate: Option<crate::query::Predicate>,
limit: Option<usize>,
_marker: PhantomData<T>,
}
impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
pub fn limit(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
let q = crate::query::Query {
collection: self.collection_id,
predicate: self.predicate,
limit: self.limit,
order_by: None,
};
let rows = self.db.query(&q)?;
Ok(rows.into_iter().map(project_row::<T>).collect())
}
pub fn explain(self) -> Result<String, DbError> {
let q = crate::query::Query {
collection: self.collection_id,
predicate: self.predicate,
limit: self.limit,
order_by: None,
};
self.db.explain_query(&q)
}
}
fn validate_subset_model<T: crate::schema::DbModel>(
col: &crate::catalog::CollectionInfo,
) -> Result<(), DbError> {
crate::schema_compat::validate_model_fields_against_catalog(
col,
T::primary_field(),
&T::fields(),
&T::indexes(),
)
}
pub fn row_subset_by_field_defs(
row: &BTreeMap<String, RowValue>,
wanted: &[FieldDef],
) -> BTreeMap<String, RowValue> {
let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
for f in wanted {
let segs = &f.path.0;
if segs.is_empty() {
continue;
}
let Some(leaf) = row_value_at_path_segments(row, segs) else {
continue;
};
let root = segs[0].to_string();
if segs.len() == 1 {
out.insert(root, leaf);
} else {
let nested = row_value_nested_object_path(&segs[1..], leaf);
match out.get_mut(&root) {
Some(existing) => merge_row_value_trees(existing, nested),
None => {
out.insert(root, nested);
}
}
}
}
out
}
fn row_value_at_path_segments(
row: &BTreeMap<String, RowValue>,
path: &[std::borrow::Cow<'static, str>],
) -> Option<RowValue> {
if path.is_empty() {
return None;
}
let mut cur = row.get(path[0].as_ref())?;
for seg in path.iter().skip(1) {
cur = match cur {
RowValue::Object(m) => m.get(seg.as_ref())?,
RowValue::None => return None,
_ => return None,
};
}
Some(cur.clone())
}
fn row_value_nested_object_path(
segments: &[std::borrow::Cow<'static, str>],
leaf: RowValue,
) -> RowValue {
debug_assert!(!segments.is_empty());
if segments.len() == 1 {
let mut m = BTreeMap::new();
m.insert(segments[0].to_string(), leaf);
RowValue::Object(m)
} else {
let mut m = BTreeMap::new();
m.insert(
segments[0].to_string(),
row_value_nested_object_path(&segments[1..], leaf),
);
RowValue::Object(m)
}
}
fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
match (&mut *into, from) {
(RowValue::Object(m1), RowValue::Object(m2)) => {
for (k, v2) in m2 {
match m1.entry(k) {
std::collections::btree_map::Entry::Vacant(e) => {
e.insert(v2);
}
std::collections::btree_map::Entry::Occupied(mut e) => {
merge_row_value_trees(e.get_mut(), v2);
}
}
}
}
(slot, from) => *slot = from,
}
}
fn project_row<T: crate::schema::DbModel>(
row: BTreeMap<String, RowValue>,
) -> BTreeMap<String, RowValue> {
row_subset_by_field_defs(&row, &T::fields())
}
pub(crate) fn scalar_at_path(
row: &BTreeMap<String, RowValue>,
path: &crate::schema::FieldPath,
) -> Option<ScalarValue> {
let mut cur: Option<&RowValue> = None;
for (i, seg) in path.0.iter().enumerate() {
let key = seg.as_ref();
cur = match (i, cur) {
(0, _) => row.get(key),
(_, Some(RowValue::Object(map))) => map.get(key),
(_, Some(RowValue::None)) => return None,
_ => return None,
};
}
cur.and_then(|v| v.as_scalar())
}
impl Database<FileStore> {
pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
Self::open_with_options(path, crate::config::OpenOptions::default())
}
pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
Self::open_with_options(
path,
crate::config::OpenOptions {
recovery: crate::config::RecoveryMode::Strict,
mode: OpenMode::ReadOnly,
},
)
}
pub fn open_with_options(
path: impl AsRef<Path>,
opts: crate::config::OpenOptions,
) -> Result<Self, DbError> {
let path = path.as_ref().to_path_buf();
let store = FileStore::open_locked(&path, opts.mode)?;
let mut db = Self::open_with_store(path.clone(), store, opts)?;
if opts.mode == OpenMode::ReadWrite {
db.writer_registry = Some(writer_registry::WriterRegistryGuard::new(path)?);
}
Ok(db)
}
}
impl Database<VecStore> {
pub fn open_in_memory() -> Result<Self, DbError> {
Self::open_in_memory_with_options(crate::config::OpenOptions::default())
}
pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
}
pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
Self::open_with_store(
PathBuf::from(":memory:"),
VecStore::from_vec(bytes),
crate::config::OpenOptions::default(),
)
}
pub fn into_snapshot_bytes(self) -> Vec<u8> {
self.store.into_inner()
}
pub fn snapshot_bytes(&self) -> Vec<u8> {
self.store.as_slice().to_vec()
}
pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
}
pub(crate) fn export_snapshot_to_path_with_fsops(
fs: &dyn FsOps,
dest_path: impl AsRef<Path>,
bytes: &[u8],
) -> Result<(), DbError> {
fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
Ok(())
}
pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
Self::from_snapshot_bytes(bytes)
}
}
#[cfg(test)]
mod scalar_at_path_tests {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/unit/src_db_mod_scalar_at_path_tests.rs"
));
}
#[cfg(test)]
mod tests {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/unit/src_db_mod_tests.rs"
));
}