1use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19pub struct MutationEngine {
24 collection: String,
25 schema: ColumnarSchema,
26 memtable: ColumnarMemtable,
27 pk_index: PkIndex,
28 delete_bitmaps: HashMap<u32, DeleteBitmap>,
30 pk_col_indices: Vec<usize>,
32 next_segment_id: u32,
34 memtable_segment_id: u32,
37 memtable_row_counter: u32,
39}
40
41pub struct MutationResult {
43 pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48 pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50 let pk_col_indices: Vec<usize> = schema
51 .columns
52 .iter()
53 .enumerate()
54 .filter(|(_, c)| c.primary_key)
55 .map(|(i, _)| i)
56 .collect();
57
58 let memtable = ColumnarMemtable::new(&schema);
59 let memtable_segment_id = 0;
61
62 Self {
63 collection,
64 schema,
65 memtable,
66 pk_index: PkIndex::new(),
67 delete_bitmaps: HashMap::new(),
68 pk_col_indices,
69 next_segment_id: 1,
70 memtable_segment_id,
71 memtable_row_counter: 0,
72 }
73 }
74
75 pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
79 let pk_bytes = self.extract_pk_bytes(values)?;
81
82 if self.pk_index.contains(&pk_bytes) {
84 return Err(ColumnarError::DuplicatePrimaryKey);
85 }
86
87 let row_data = encode_row_for_wal(values);
89 let wal = ColumnarWalRecord::InsertRow {
90 collection: self.collection.clone(),
91 row_data,
92 };
93
94 self.memtable.append_row(values)?;
96
97 let location = RowLocation {
99 segment_id: self.memtable_segment_id,
100 row_index: self.memtable_row_counter,
101 };
102 self.pk_index.upsert(pk_bytes, location);
103 self.memtable_row_counter += 1;
104
105 Ok(MutationResult {
106 wal_records: vec![wal],
107 })
108 }
109
110 pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
115 let pk_bytes = encode_pk(pk_value);
116
117 let location = self
118 .pk_index
119 .get(&pk_bytes)
120 .copied()
121 .ok_or(ColumnarError::PrimaryKeyNotFound)?;
122
123 let wal = ColumnarWalRecord::DeleteRows {
125 collection: self.collection.clone(),
126 segment_id: location.segment_id,
127 row_indices: vec![location.row_index],
128 };
129
130 let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
132 bitmap.mark_deleted(location.row_index);
133
134 self.pk_index.remove(&pk_bytes);
136
137 Ok(MutationResult {
138 wal_records: vec![wal],
139 })
140 }
141
142 pub fn update(
152 &mut self,
153 old_pk: &Value,
154 new_values: &[Value],
155 ) -> Result<MutationResult, ColumnarError> {
156 let delete_result = self.delete(old_pk)?;
158
159 let insert_result = self.insert(new_values)?;
161
162 let mut wal_records = delete_result.wal_records;
164 wal_records.extend(insert_result.wal_records);
165
166 Ok(MutationResult { wal_records })
167 }
168
169 pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
174 let row_count = self.memtable_row_counter;
175
176 self.pk_index
178 .remap_segment(self.memtable_segment_id, |old_row| {
179 Some(RowLocation {
180 segment_id: new_segment_id,
181 row_index: old_row,
182 })
183 });
184
185 self.memtable_segment_id = self.next_segment_id;
187 self.next_segment_id += 1;
188 self.memtable_row_counter = 0;
189
190 let wal = ColumnarWalRecord::MemtableFlushed {
191 collection: self.collection.clone(),
192 segment_id: new_segment_id,
193 row_count: row_count as u64,
194 };
195
196 MutationResult {
197 wal_records: vec![wal],
198 }
199 }
200
201 pub fn on_compaction_complete(
205 &mut self,
206 old_segment_ids: &[u32],
207 new_segment_id: u32,
208 row_mapping: &HashMap<(u32, u32), u32>,
209 ) -> MutationResult {
210 for &old_seg in old_segment_ids {
212 self.pk_index.remap_segment(old_seg, |old_row| {
213 row_mapping
214 .get(&(old_seg, old_row))
215 .map(|&new_row| RowLocation {
216 segment_id: new_segment_id,
217 row_index: new_row,
218 })
219 });
220
221 self.delete_bitmaps.remove(&old_seg);
223 }
224
225 let wal = ColumnarWalRecord::CompactionCommit {
226 collection: self.collection.clone(),
227 old_segment_ids: old_segment_ids.to_vec(),
228 new_segment_ids: vec![new_segment_id],
229 };
230
231 MutationResult {
232 wal_records: vec![wal],
233 }
234 }
235
236 pub fn memtable(&self) -> &ColumnarMemtable {
240 &self.memtable
241 }
242
243 pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
245 &mut self.memtable
246 }
247
248 pub fn pk_index(&self) -> &PkIndex {
250 &self.pk_index
251 }
252
253 pub fn pk_index_mut(&mut self) -> &mut PkIndex {
255 &mut self.pk_index
256 }
257
258 pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
260 self.delete_bitmaps.get(&segment_id)
261 }
262
263 pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
265 &self.delete_bitmaps
266 }
267
268 pub fn collection(&self) -> &str {
270 &self.collection
271 }
272
273 pub fn schema(&self) -> &ColumnarSchema {
275 &self.schema
276 }
277
278 pub fn should_flush(&self) -> bool {
280 self.memtable.should_flush()
281 }
282
283 pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
288 let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
289 self.memtable
290 .iter_rows()
291 .enumerate()
292 .filter_map(move |(row_idx, row)| {
293 if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
294 None
295 } else {
296 Some(row)
297 }
298 })
299 }
300
301 pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
303 if self
304 .delete_bitmaps
305 .get(&self.memtable_segment_id)
306 .is_some_and(|bm| bm.is_deleted(row_idx as u32))
307 {
308 return None;
309 }
310 self.memtable.get_row(row_idx)
311 }
312
313 pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
315 self.delete_bitmaps
316 .get(&segment_id)
317 .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
318 }
319
320 fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
324 if values.len() != self.schema.columns.len() {
325 return Err(ColumnarError::SchemaMismatch {
326 expected: self.schema.columns.len(),
327 got: values.len(),
328 });
329 }
330
331 if self.pk_col_indices.len() == 1 {
332 Ok(encode_pk(&values[self.pk_col_indices[0]]))
333 } else {
334 let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
335 Ok(crate::pk_index::encode_composite_pk(&pk_values))
336 }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
343 use nodedb_types::value::Value;
344
345 use super::*;
346
347 fn test_schema() -> ColumnarSchema {
348 ColumnarSchema::new(vec![
349 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
350 ColumnDef::required("name", ColumnType::String),
351 ColumnDef::nullable("score", ColumnType::Float64),
352 ])
353 .expect("valid")
354 }
355
356 #[test]
357 fn insert_and_pk_check() {
358 let mut engine = MutationEngine::new("test".into(), test_schema());
359
360 let result = engine
361 .insert(&[
362 Value::Integer(1),
363 Value::String("Alice".into()),
364 Value::Float(0.75),
365 ])
366 .expect("insert");
367
368 assert_eq!(result.wal_records.len(), 1);
369 assert!(matches!(
370 &result.wal_records[0],
371 ColumnarWalRecord::InsertRow { .. }
372 ));
373
374 assert_eq!(engine.pk_index().len(), 1);
375 assert_eq!(engine.memtable().row_count(), 1);
376 }
377
378 #[test]
379 fn delete_by_pk() {
380 let mut engine = MutationEngine::new("test".into(), test_schema());
381
382 engine
383 .insert(&[
384 Value::Integer(1),
385 Value::String("Alice".into()),
386 Value::Null,
387 ])
388 .expect("insert");
389
390 let result = engine.delete(&Value::Integer(1)).expect("delete");
391 assert_eq!(result.wal_records.len(), 1);
392 assert!(matches!(
393 &result.wal_records[0],
394 ColumnarWalRecord::DeleteRows { .. }
395 ));
396
397 assert!(engine.pk_index().is_empty());
399 }
400
401 #[test]
402 fn delete_nonexistent_pk() {
403 let mut engine = MutationEngine::new("test".into(), test_schema());
404
405 let err = engine.delete(&Value::Integer(999));
406 assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
407 }
408
409 #[test]
410 fn update_row() {
411 let mut engine = MutationEngine::new("test".into(), test_schema());
412
413 engine
414 .insert(&[
415 Value::Integer(1),
416 Value::String("Alice".into()),
417 Value::Float(0.5),
418 ])
419 .expect("insert");
420
421 let result = engine
423 .update(
424 &Value::Integer(1),
425 &[
426 Value::Integer(1),
427 Value::String("Alice Updated".into()),
428 Value::Float(0.75),
429 ],
430 )
431 .expect("update");
432
433 assert_eq!(result.wal_records.len(), 2);
435 assert!(matches!(
436 &result.wal_records[0],
437 ColumnarWalRecord::DeleteRows { .. }
438 ));
439 assert!(matches!(
440 &result.wal_records[1],
441 ColumnarWalRecord::InsertRow { .. }
442 ));
443
444 assert_eq!(engine.pk_index().len(), 1);
446 assert_eq!(engine.memtable().row_count(), 2);
448 }
449
450 #[test]
451 fn memtable_flush_remaps_pk() {
452 let mut engine = MutationEngine::new("test".into(), test_schema());
453
454 for i in 0..5 {
455 engine
456 .insert(&[
457 Value::Integer(i),
458 Value::String(format!("u{i}")),
459 Value::Null,
460 ])
461 .expect("insert");
462 }
463
464 let result = engine.on_memtable_flushed(1);
466 assert_eq!(result.wal_records.len(), 1);
467 assert!(matches!(
468 &result.wal_records[0],
469 ColumnarWalRecord::MemtableFlushed {
470 segment_id: 1,
471 row_count: 5,
472 ..
473 }
474 ));
475
476 let pk = encode_pk(&Value::Integer(3));
478 let loc = engine.pk_index().get(&pk).expect("pk exists");
479 assert_eq!(loc.segment_id, 1);
480 assert_eq!(loc.row_index, 3);
481 }
482
483 #[test]
484 fn multiple_inserts_and_deletes() {
485 let mut engine = MutationEngine::new("test".into(), test_schema());
486
487 for i in 0..10 {
488 engine
489 .insert(&[
490 Value::Integer(i),
491 Value::String(format!("u{i}")),
492 Value::Null,
493 ])
494 .expect("insert");
495 }
496
497 for i in (1..10).step_by(2) {
499 engine.delete(&Value::Integer(i)).expect("delete");
500 }
501
502 assert_eq!(engine.pk_index().len(), 5); }
504
505 #[test]
506 fn should_compact_threshold() {
507 let mut engine = MutationEngine::new("test".into(), test_schema());
508
509 for i in 0..10 {
511 engine
512 .insert(&[
513 Value::Integer(i),
514 Value::String(format!("u{i}")),
515 Value::Null,
516 ])
517 .expect("insert");
518 }
519 engine.on_memtable_flushed(1);
520
521 for i in 0..3 {
523 engine.delete(&Value::Integer(i)).expect("delete");
524 }
525
526 assert!(engine.should_compact(1, 10));
527 }
528}