use std::collections::HashMap;
use nodedb_types::columnar::ColumnarSchema;
use nodedb_types::surrogate::Surrogate;
use nodedb_types::value::Value;
use crate::delete_bitmap::DeleteBitmap;
use crate::error::ColumnarError;
use crate::memtable::ColumnarMemtable;
use crate::pk_index::{PkIndex, encode_pk};
use crate::wal_record::ColumnarWalRecord;
pub struct MutationEngine {
pub(super) collection: String,
pub(super) schema: ColumnarSchema,
pub(super) memtable: ColumnarMemtable,
pub(super) pk_index: PkIndex,
pub(super) delete_bitmaps: HashMap<u64, DeleteBitmap>,
pub(super) pk_col_indices: Vec<usize>,
pub(super) next_segment_id: u64,
pub(super) memtable_segment_id: u64,
pub(super) memtable_row_counter: u32,
pub(super) memtable_surrogates: Vec<Option<Surrogate>>,
}
#[derive(Debug)]
pub struct MutationResult {
pub wal_records: Vec<ColumnarWalRecord>,
}
impl MutationEngine {
pub fn new(collection: String, schema: ColumnarSchema) -> Self {
let pk_col_indices: Vec<usize> = schema
.columns
.iter()
.enumerate()
.filter(|(_, c)| c.primary_key)
.map(|(i, _)| i)
.collect();
let memtable = ColumnarMemtable::new(&schema);
let memtable_segment_id = 0;
Self {
collection,
schema,
memtable,
pk_index: PkIndex::new(),
delete_bitmaps: HashMap::new(),
pk_col_indices,
next_segment_id: 1,
memtable_segment_id,
memtable_row_counter: 0,
memtable_surrogates: Vec::new(),
}
}
pub fn memtable(&self) -> &ColumnarMemtable {
&self.memtable
}
pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
&mut self.memtable
}
pub fn pk_index(&self) -> &PkIndex {
&self.pk_index
}
pub fn pk_index_mut(&mut self) -> &mut PkIndex {
&mut self.pk_index
}
pub fn delete_bitmap(&self, segment_id: u64) -> Option<&DeleteBitmap> {
self.delete_bitmaps.get(&segment_id)
}
pub fn delete_bitmap_mut(&mut self, segment_id: u64) -> &mut DeleteBitmap {
self.delete_bitmaps.entry(segment_id).or_default()
}
pub fn memtable_segment_id(&self) -> u64 {
self.memtable_segment_id
}
pub fn pk_col_indices(&self) -> &[usize] {
&self.pk_col_indices
}
pub fn delete_bitmaps(&self) -> &HashMap<u64, DeleteBitmap> {
&self.delete_bitmaps
}
pub fn collection(&self) -> &str {
&self.collection
}
pub fn schema(&self) -> &ColumnarSchema {
&self.schema
}
pub fn should_flush(&self) -> bool {
self.memtable.should_flush()
}
pub fn memtable_surrogates(&self) -> &[Option<Surrogate>] {
&self.memtable_surrogates
}
pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
self.memtable
.iter_rows()
.enumerate()
.filter_map(move |(row_idx, row)| {
if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
None
} else {
Some(row)
}
})
}
pub fn scan_memtable_rows_with_surrogates(
&self,
) -> impl Iterator<Item = (Option<Surrogate>, Vec<Value>)> + '_ {
let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
let surrogates = &self.memtable_surrogates;
self.memtable
.iter_rows()
.enumerate()
.filter_map(move |(row_idx, row)| {
if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
return None;
}
let surrogate = surrogates.get(row_idx).copied().flatten();
Some((surrogate, row))
})
}
pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
if self
.delete_bitmaps
.get(&self.memtable_segment_id)
.is_some_and(|bm| bm.is_deleted(row_idx as u32))
{
return None;
}
self.memtable.get_row(row_idx)
}
pub fn rollback_memtable_inserts(
&mut self,
row_count_before: usize,
inserted_pks: &[Vec<u8>],
displaced: &[(Vec<u8>, crate::pk_index::RowLocation)],
) {
for pk in inserted_pks {
self.pk_index.remove(pk);
}
for (pk, prior_location) in displaced {
self.pk_index.upsert(pk.clone(), *prior_location);
if let Some(bm) = self.delete_bitmaps.get_mut(&prior_location.segment_id) {
bm.unmark_deleted(prior_location.row_index);
}
}
self.memtable.truncate_to(row_count_before);
self.memtable_surrogates.truncate(row_count_before);
self.memtable_row_counter = row_count_before as u32;
}
pub fn next_segment_id(&self) -> u64 {
self.next_segment_id
}
pub fn should_compact(&self, segment_id: u64, total_rows: u64) -> bool {
self.delete_bitmaps
.get(&segment_id)
.is_some_and(|bm| bm.should_compact(total_rows, 0.2))
}
pub fn encode_pk_from_row(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
self.extract_pk_bytes(values)
}
pub(super) fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
if values.len() != self.schema.columns.len() {
return Err(ColumnarError::SchemaMismatch {
expected: self.schema.columns.len(),
got: values.len(),
});
}
if self.pk_col_indices.len() == 1 {
Ok(encode_pk(&values[self.pk_col_indices[0]]))
} else {
let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
Ok(crate::pk_index::encode_composite_pk(&pk_values))
}
}
}
#[cfg(test)]
mod tests {
use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
use super::*;
fn minimal_schema() -> ColumnarSchema {
ColumnarSchema {
columns: vec![ColumnDef::required("id", ColumnType::Int64).with_primary_key()],
version: 1,
}
}
#[test]
fn segment_id_allocator_returns_err_at_u64_max() {
let mut engine = MutationEngine::new("test".to_string(), minimal_schema());
engine.next_segment_id = u64::MAX;
let result = engine.on_memtable_flushed(u64::MAX - 1);
assert!(
matches!(result, Err(ColumnarError::SegmentIdExhausted)),
"expected SegmentIdExhausted, got: {result:?}"
);
}
}