use std::collections::BTreeMap;
use std::sync::Arc;
use crate::catalog::CatalogRecordWire;
use crate::error::{DbError, FormatError, SchemaError, TransactionError};
use crate::index::{encode_index_payload, IndexOp};
use crate::record::{
encode_record_payload_v2_op, encode_record_payload_v3_op, non_pk_defs_in_order, RowValue,
ScalarValue, OP_DELETE, OP_REPLACE,
};
use crate::schema::{CollectionId, FieldDef};
use crate::storage::Store;
use super::{
handle_registry, index_deletes_for_existing_row, plan_insert_row, row_value_at_path_segments,
Database,
};
impl<S: Store> Database<S> {
pub(crate) fn next_txn_id(&mut self) -> u64 {
self.txn_seq = self.txn_seq.saturating_add(1);
self.txn_seq
}
#[inline]
pub(crate) fn commit_write_batch(
&mut self,
txn_id: u64,
body: &[(crate::segments::header::SegmentType, &[u8])],
) -> Result<(), DbError> {
super::segment_write::commit_write_txn_v6(
&mut self.store,
self.segment_start,
&mut self.format_minor,
txn_id,
body,
)
}
#[inline]
pub(crate) fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
self.catalog.apply_record(wire)
}
pub fn transaction<R>(
&mut self,
f: impl FnOnce(&mut Self) -> Result<R, DbError>,
) -> Result<R, DbError> {
self.begin_transaction()?;
match f(self) {
Ok(v) => match self.commit_transaction() {
Ok(()) => Ok(v),
Err(e) => {
self.rollback_transaction();
Err(e)
}
},
Err(e) => {
self.rollback_transaction();
Err(e)
}
}
}
pub fn begin_transaction(&mut self) -> Result<(), DbError> {
if self.txn_staging.is_some() {
return Err(DbError::Transaction(TransactionError::NestedTransaction));
}
let tid = self.next_txn_id();
self.txn_staging = Some(super::TxnStaging {
txn_id: tid,
shadow_catalog: self.catalog.clone(),
shadow_latest: self.latest.clone(),
shadow_indexes: self.indexes.clone(),
pending: Vec::new(),
});
Ok(())
}
pub fn commit_transaction(&mut self) -> Result<(), DbError> {
self.commit_txn_staging()
}
pub fn rollback_transaction(&mut self) {
self.txn_staging = None;
}
fn commit_txn_staging(&mut self) -> Result<(), DbError> {
let Some(st) = self.txn_staging.take() else {
return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
};
if st.pending.is_empty() {
self.catalog = st.shadow_catalog;
self.latest = st.shadow_latest;
self.indexes = st.shadow_indexes;
return Ok(());
}
let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
self.commit_write_batch(st.txn_id, &batch)?;
self.catalog = st.shadow_catalog;
self.latest = st.shadow_latest;
self.indexes = st.shadow_indexes;
self.push_shared_mirror();
Ok(())
}
pub(crate) fn push_shared_mirror(&mut self) {
let Some(ref shared) = self.shared_mirror else {
return;
};
if let Ok(mut g) = shared.write() {
let generation = g.generation.saturating_add(1);
*g = Arc::new(handle_registry::SharedDbState {
catalog: self.catalog.clone(),
latest: self.latest.clone(),
indexes: self.indexes.clone(),
segment_start: self.segment_start,
format_minor: self.format_minor,
generation,
});
}
}
pub fn insert(
&mut self,
collection_id: CollectionId,
row: BTreeMap<String, RowValue>,
) -> Result<(), DbError> {
if self.read_only_attached {
return Err(DbError::Io(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"database opened read-only",
)));
}
super::segment_write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
let (mut payload, full, mut index_entries, pk_scalar) =
plan_insert_row(self.catalog_for_read(), collection_id, row)?;
#[cfg(test)]
let mut full = full;
let existing = self
.latest_for_read()
.get(&(collection_id.0, full.0.clone()))
.cloned();
if existing.is_some() {
#[cfg(test)]
if let Some(poison) = self.test_poison_planned_replace_row.take() {
poison(collection_id, &mut full.1);
}
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_def = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
let non_pk_defs = if has_multi_segment_schema {
col.fields
.iter()
.filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
.collect::<Vec<_>>()
} else {
non_pk_defs_in_order(&col.fields, pk_name)
};
let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
for def in &non_pk_defs {
let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
non_pk.push(((*def).clone(), v));
}
payload = (if has_multi_segment_schema {
encode_record_payload_v3_op(
collection_id.0,
col.current_version.0,
OP_REPLACE,
&pk_scalar,
&pk_def.ty,
&non_pk,
)
} else {
encode_record_payload_v2_op(
collection_id.0,
col.current_version.0,
OP_REPLACE,
&pk_scalar,
&pk_def.ty,
&non_pk,
)
})?;
if let Some(ref old_row) = existing {
let mut deletes = index_deletes_for_existing_row(
collection_id,
&pk_scalar,
&col.indexes,
old_row,
);
deletes.append(&mut index_entries);
index_entries = deletes;
}
}
for e in &index_entries {
if e.kind != crate::schema::IndexKind::Unique {
continue;
}
let Some(existing) =
self.indexes_for_read()
.unique_lookup(e.collection_id, &e.index_name, &e.index_key)
else {
continue;
};
if e.op != IndexOp::Insert {
continue;
}
if existing == e.pk_key.as_slice() {
continue;
}
return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
}
if let Some(st) = &mut self.txn_staging {
if !index_entries.is_empty() {
let b = encode_index_payload(&index_entries);
st.pending
.push((crate::segments::header::SegmentType::Index, b));
}
st.pending.push((
crate::segments::header::SegmentType::Record,
payload.clone(),
));
st.shadow_latest
.insert((collection_id.0, full.0.clone()), full.1.clone());
for e in index_entries {
st.shadow_indexes.apply(e)?;
}
return Ok(());
}
let tid = self.next_txn_id();
let index_bytes = if index_entries.is_empty() {
None
} else {
Some(encode_index_payload(&index_entries))
};
let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
if let Some(ref b) = index_bytes {
batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
}
batch.push((
crate::segments::header::SegmentType::Record,
payload.as_slice(),
));
self.commit_write_batch(tid, &batch)?;
self.latest.insert((collection_id.0, full.0), full.1);
for e in index_entries {
self.indexes.apply(e)?;
}
self.push_shared_mirror();
Ok(())
}
pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
if self.read_only_attached {
return Err(DbError::Io(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"database opened read-only",
)));
}
super::segment_write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
let col = self
.catalog_for_read()
.get(collection_id)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: collection_id.0,
}))?;
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_def = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
if !pk.ty_matches(&pk_def.ty) {
return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
}
let pk_key = pk.canonical_key_bytes();
let existing = self
.latest_for_read()
.get(&(collection_id.0, pk_key.clone()))
.cloned();
let Some(old_row) = existing else {
return Ok(());
};
let indexes = col.indexes.clone();
let schema_ver = col.current_version.0;
let pk_ty = pk_def.ty.clone();
let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
let mut index_entries =
index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
#[cfg(not(test))]
let pk_for_record = pk.clone();
#[cfg(test)]
let pk_for_record = {
let mut p = pk.clone();
if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
p = poison(p);
}
p
};
let record_payload = (if has_multi_segment_schema {
encode_record_payload_v3_op(
collection_id.0,
schema_ver,
OP_DELETE,
&pk_for_record,
&pk_ty,
&[],
)
} else {
encode_record_payload_v2_op(
collection_id.0,
schema_ver,
OP_DELETE,
&pk_for_record,
&pk_ty,
&[],
)
})?;
if let Some(st) = &mut self.txn_staging {
if !index_entries.is_empty() {
let b = encode_index_payload(&index_entries);
st.pending
.push((crate::segments::header::SegmentType::Index, b));
}
st.pending.push((
crate::segments::header::SegmentType::Record,
record_payload.clone(),
));
st.shadow_latest.remove(&(collection_id.0, pk_key));
for e in index_entries.drain(..) {
st.shadow_indexes.apply(e)?;
}
return Ok(());
}
let tid = self.next_txn_id();
let index_bytes = if index_entries.is_empty() {
None
} else {
Some(encode_index_payload(&index_entries))
};
let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
if let Some(ref b) = index_bytes {
batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
}
batch.push((
crate::segments::header::SegmentType::Record,
record_payload.as_slice(),
));
self.commit_write_batch(tid, &batch)?;
self.latest.remove(&(collection_id.0, pk_key));
for e in index_entries {
self.indexes.apply(e)?;
}
self.push_shared_mirror();
Ok(())
}
}