use std::collections::HashMap;
use nodedb_types::columnar::ColumnarSchema;
use nodedb_types::value::Value;
use crate::delete_bitmap::DeleteBitmap;
use crate::error::ColumnarError;
use crate::memtable::ColumnarMemtable;
use crate::pk_index::{PkIndex, RowLocation, encode_pk};
use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
pub struct MutationEngine {
collection: String,
schema: ColumnarSchema,
memtable: ColumnarMemtable,
pk_index: PkIndex,
delete_bitmaps: HashMap<u32, DeleteBitmap>,
pk_col_indices: Vec<usize>,
next_segment_id: u32,
memtable_segment_id: u32,
memtable_row_counter: u32,
}
pub struct MutationResult {
pub wal_records: Vec<ColumnarWalRecord>,
}
impl MutationEngine {
pub fn new(collection: String, schema: ColumnarSchema) -> Self {
let pk_col_indices: Vec<usize> = schema
.columns
.iter()
.enumerate()
.filter(|(_, c)| c.primary_key)
.map(|(i, _)| i)
.collect();
let memtable = ColumnarMemtable::new(&schema);
let memtable_segment_id = 0;
Self {
collection,
schema,
memtable,
pk_index: PkIndex::new(),
delete_bitmaps: HashMap::new(),
pk_col_indices,
next_segment_id: 1,
memtable_segment_id,
memtable_row_counter: 0,
}
}
pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
let pk_bytes = self.extract_pk_bytes(values)?;
if self.pk_index.contains(&pk_bytes) {
return Err(ColumnarError::DuplicatePrimaryKey);
}
let row_data = encode_row_for_wal(values);
let wal = ColumnarWalRecord::InsertRow {
collection: self.collection.clone(),
row_data,
};
self.memtable.append_row(values)?;
let location = RowLocation {
segment_id: self.memtable_segment_id,
row_index: self.memtable_row_counter,
};
self.pk_index.upsert(pk_bytes, location);
self.memtable_row_counter += 1;
Ok(MutationResult {
wal_records: vec![wal],
})
}
pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
let pk_bytes = encode_pk(pk_value);
let location = self
.pk_index
.get(&pk_bytes)
.copied()
.ok_or(ColumnarError::PrimaryKeyNotFound)?;
let wal = ColumnarWalRecord::DeleteRows {
collection: self.collection.clone(),
segment_id: location.segment_id,
row_indices: vec![location.row_index],
};
let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
bitmap.mark_deleted(location.row_index);
self.pk_index.remove(&pk_bytes);
Ok(MutationResult {
wal_records: vec![wal],
})
}
pub fn update(
&mut self,
old_pk: &Value,
new_values: &[Value],
) -> Result<MutationResult, ColumnarError> {
let delete_result = self.delete(old_pk)?;
let insert_result = self.insert(new_values)?;
let mut wal_records = delete_result.wal_records;
wal_records.extend(insert_result.wal_records);
Ok(MutationResult { wal_records })
}
pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
let row_count = self.memtable_row_counter;
self.pk_index
.remap_segment(self.memtable_segment_id, |old_row| {
Some(RowLocation {
segment_id: new_segment_id,
row_index: old_row,
})
});
self.memtable_segment_id = self.next_segment_id;
self.next_segment_id += 1;
self.memtable_row_counter = 0;
let wal = ColumnarWalRecord::MemtableFlushed {
collection: self.collection.clone(),
segment_id: new_segment_id,
row_count: row_count as u64,
};
MutationResult {
wal_records: vec![wal],
}
}
pub fn on_compaction_complete(
&mut self,
old_segment_ids: &[u32],
new_segment_id: u32,
row_mapping: &HashMap<(u32, u32), u32>,
) -> MutationResult {
for &old_seg in old_segment_ids {
self.pk_index.remap_segment(old_seg, |old_row| {
row_mapping
.get(&(old_seg, old_row))
.map(|&new_row| RowLocation {
segment_id: new_segment_id,
row_index: new_row,
})
});
self.delete_bitmaps.remove(&old_seg);
}
let wal = ColumnarWalRecord::CompactionCommit {
collection: self.collection.clone(),
old_segment_ids: old_segment_ids.to_vec(),
new_segment_ids: vec![new_segment_id],
};
MutationResult {
wal_records: vec![wal],
}
}
pub fn memtable(&self) -> &ColumnarMemtable {
&self.memtable
}
pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
&mut self.memtable
}
pub fn pk_index(&self) -> &PkIndex {
&self.pk_index
}
pub fn pk_index_mut(&mut self) -> &mut PkIndex {
&mut self.pk_index
}
pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
self.delete_bitmaps.get(&segment_id)
}
pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
&self.delete_bitmaps
}
pub fn collection(&self) -> &str {
&self.collection
}
pub fn schema(&self) -> &ColumnarSchema {
&self.schema
}
pub fn should_flush(&self) -> bool {
self.memtable.should_flush()
}
pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
self.delete_bitmaps
.get(&segment_id)
.is_some_and(|bm| bm.should_compact(total_rows, 0.2))
}
fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
if values.len() != self.schema.columns.len() {
return Err(ColumnarError::SchemaMismatch {
expected: self.schema.columns.len(),
got: values.len(),
});
}
if self.pk_col_indices.len() == 1 {
Ok(encode_pk(&values[self.pk_col_indices[0]]))
} else {
let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
Ok(crate::pk_index::encode_composite_pk(&pk_values))
}
}
}
#[cfg(test)]
mod tests {
use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
use nodedb_types::value::Value;
use super::*;
fn test_schema() -> ColumnarSchema {
ColumnarSchema::new(vec![
ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
ColumnDef::required("name", ColumnType::String),
ColumnDef::nullable("score", ColumnType::Float64),
])
.expect("valid")
}
#[test]
fn insert_and_pk_check() {
let mut engine = MutationEngine::new("test".into(), test_schema());
let result = engine
.insert(&[
Value::Integer(1),
Value::String("Alice".into()),
Value::Float(0.75),
])
.expect("insert");
assert_eq!(result.wal_records.len(), 1);
assert!(matches!(
&result.wal_records[0],
ColumnarWalRecord::InsertRow { .. }
));
assert_eq!(engine.pk_index().len(), 1);
assert_eq!(engine.memtable().row_count(), 1);
}
#[test]
fn delete_by_pk() {
let mut engine = MutationEngine::new("test".into(), test_schema());
engine
.insert(&[
Value::Integer(1),
Value::String("Alice".into()),
Value::Null,
])
.expect("insert");
let result = engine.delete(&Value::Integer(1)).expect("delete");
assert_eq!(result.wal_records.len(), 1);
assert!(matches!(
&result.wal_records[0],
ColumnarWalRecord::DeleteRows { .. }
));
assert!(engine.pk_index().is_empty());
}
#[test]
fn delete_nonexistent_pk() {
let mut engine = MutationEngine::new("test".into(), test_schema());
let err = engine.delete(&Value::Integer(999));
assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
}
#[test]
fn update_row() {
let mut engine = MutationEngine::new("test".into(), test_schema());
engine
.insert(&[
Value::Integer(1),
Value::String("Alice".into()),
Value::Float(0.5),
])
.expect("insert");
let result = engine
.update(
&Value::Integer(1),
&[
Value::Integer(1),
Value::String("Alice Updated".into()),
Value::Float(0.75),
],
)
.expect("update");
assert_eq!(result.wal_records.len(), 2);
assert!(matches!(
&result.wal_records[0],
ColumnarWalRecord::DeleteRows { .. }
));
assert!(matches!(
&result.wal_records[1],
ColumnarWalRecord::InsertRow { .. }
));
assert_eq!(engine.pk_index().len(), 1);
assert_eq!(engine.memtable().row_count(), 2);
}
#[test]
fn memtable_flush_remaps_pk() {
let mut engine = MutationEngine::new("test".into(), test_schema());
for i in 0..5 {
engine
.insert(&[
Value::Integer(i),
Value::String(format!("u{i}")),
Value::Null,
])
.expect("insert");
}
let result = engine.on_memtable_flushed(1);
assert_eq!(result.wal_records.len(), 1);
assert!(matches!(
&result.wal_records[0],
ColumnarWalRecord::MemtableFlushed {
segment_id: 1,
row_count: 5,
..
}
));
let pk = encode_pk(&Value::Integer(3));
let loc = engine.pk_index().get(&pk).expect("pk exists");
assert_eq!(loc.segment_id, 1);
assert_eq!(loc.row_index, 3);
}
#[test]
fn multiple_inserts_and_deletes() {
let mut engine = MutationEngine::new("test".into(), test_schema());
for i in 0..10 {
engine
.insert(&[
Value::Integer(i),
Value::String(format!("u{i}")),
Value::Null,
])
.expect("insert");
}
for i in (1..10).step_by(2) {
engine.delete(&Value::Integer(i)).expect("delete");
}
assert_eq!(engine.pk_index().len(), 5); }
#[test]
fn should_compact_threshold() {
let mut engine = MutationEngine::new("test".into(), test_schema());
for i in 0..10 {
engine
.insert(&[
Value::Integer(i),
Value::String(format!("u{i}")),
Value::Null,
])
.expect("insert");
}
engine.on_memtable_flushed(1);
for i in 0..3 {
engine.delete(&Value::Integer(i)).expect("delete");
}
assert!(engine.should_compact(1, 10));
}
}