1use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19pub struct MutationEngine {
24 collection: String,
25 schema: ColumnarSchema,
26 memtable: ColumnarMemtable,
27 pk_index: PkIndex,
28 delete_bitmaps: HashMap<u32, DeleteBitmap>,
30 pk_col_indices: Vec<usize>,
32 next_segment_id: u32,
34 memtable_segment_id: u32,
37 memtable_row_counter: u32,
39}
40
41pub struct MutationResult {
43 pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48 pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50 let pk_col_indices: Vec<usize> = schema
51 .columns
52 .iter()
53 .enumerate()
54 .filter(|(_, c)| c.primary_key)
55 .map(|(i, _)| i)
56 .collect();
57
58 let memtable = ColumnarMemtable::new(&schema);
59 let memtable_segment_id = 0;
61
62 Self {
63 collection,
64 schema,
65 memtable,
66 pk_index: PkIndex::new(),
67 delete_bitmaps: HashMap::new(),
68 pk_col_indices,
69 next_segment_id: 1,
70 memtable_segment_id,
71 memtable_row_counter: 0,
72 }
73 }
74
75 pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
79 let pk_bytes = self.extract_pk_bytes(values)?;
81
82 if self.pk_index.contains(&pk_bytes) {
84 return Err(ColumnarError::DuplicatePrimaryKey);
85 }
86
87 let row_data = encode_row_for_wal(values)?;
89 let wal = ColumnarWalRecord::InsertRow {
90 collection: self.collection.clone(),
91 row_data,
92 };
93
94 self.memtable.append_row(values)?;
96
97 let location = RowLocation {
99 segment_id: self.memtable_segment_id,
100 row_index: self.memtable_row_counter,
101 };
102 self.pk_index.upsert(pk_bytes, location);
103 self.memtable_row_counter += 1;
104
105 Ok(MutationResult {
106 wal_records: vec![wal],
107 })
108 }
109
110 pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
115 let pk_bytes = encode_pk(pk_value);
116
117 let location = self
118 .pk_index
119 .get(&pk_bytes)
120 .copied()
121 .ok_or(ColumnarError::PrimaryKeyNotFound)?;
122
123 let wal = ColumnarWalRecord::DeleteRows {
125 collection: self.collection.clone(),
126 segment_id: location.segment_id,
127 row_indices: vec![location.row_index],
128 };
129
130 let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
132 bitmap.mark_deleted(location.row_index);
133
134 self.pk_index.remove(&pk_bytes);
136
137 Ok(MutationResult {
138 wal_records: vec![wal],
139 })
140 }
141
142 pub fn update(
152 &mut self,
153 old_pk: &Value,
154 new_values: &[Value],
155 ) -> Result<MutationResult, ColumnarError> {
156 let delete_result = self.delete(old_pk)?;
158
159 let insert_result = self.insert(new_values)?;
161
162 let mut wal_records = delete_result.wal_records;
164 wal_records.extend(insert_result.wal_records);
165
166 Ok(MutationResult { wal_records })
167 }
168
169 pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
174 let row_count = self.memtable_row_counter;
175
176 self.pk_index
178 .remap_segment(self.memtable_segment_id, |old_row| {
179 Some(RowLocation {
180 segment_id: new_segment_id,
181 row_index: old_row,
182 })
183 });
184
185 self.memtable_segment_id = self.next_segment_id;
187 self.next_segment_id += 1;
188 self.memtable_row_counter = 0;
189
190 let wal = ColumnarWalRecord::MemtableFlushed {
191 collection: self.collection.clone(),
192 segment_id: new_segment_id,
193 row_count: row_count as u64,
194 };
195
196 MutationResult {
197 wal_records: vec![wal],
198 }
199 }
200
201 pub fn on_compaction_complete(
205 &mut self,
206 old_segment_ids: &[u32],
207 new_segment_id: u32,
208 row_mapping: &HashMap<(u32, u32), u32>,
209 ) -> MutationResult {
210 for &old_seg in old_segment_ids {
212 self.pk_index.remap_segment(old_seg, |old_row| {
213 row_mapping
214 .get(&(old_seg, old_row))
215 .map(|&new_row| RowLocation {
216 segment_id: new_segment_id,
217 row_index: new_row,
218 })
219 });
220
221 self.delete_bitmaps.remove(&old_seg);
223 }
224
225 let wal = ColumnarWalRecord::CompactionCommit {
226 collection: self.collection.clone(),
227 old_segment_ids: old_segment_ids.to_vec(),
228 new_segment_ids: vec![new_segment_id],
229 };
230
231 MutationResult {
232 wal_records: vec![wal],
233 }
234 }
235
236 pub fn memtable(&self) -> &ColumnarMemtable {
240 &self.memtable
241 }
242
243 pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
245 &mut self.memtable
246 }
247
248 pub fn pk_index(&self) -> &PkIndex {
250 &self.pk_index
251 }
252
253 pub fn pk_index_mut(&mut self) -> &mut PkIndex {
255 &mut self.pk_index
256 }
257
258 pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
260 self.delete_bitmaps.get(&segment_id)
261 }
262
263 pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
265 &self.delete_bitmaps
266 }
267
268 pub fn collection(&self) -> &str {
270 &self.collection
271 }
272
273 pub fn schema(&self) -> &ColumnarSchema {
275 &self.schema
276 }
277
278 pub fn should_flush(&self) -> bool {
280 self.memtable.should_flush()
281 }
282
283 pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
288 let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
289 self.memtable
290 .iter_rows()
291 .enumerate()
292 .filter_map(move |(row_idx, row)| {
293 if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
294 None
295 } else {
296 Some(row)
297 }
298 })
299 }
300
301 pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
303 if self
304 .delete_bitmaps
305 .get(&self.memtable_segment_id)
306 .is_some_and(|bm| bm.is_deleted(row_idx as u32))
307 {
308 return None;
309 }
310 self.memtable.get_row(row_idx)
311 }
312
313 pub fn next_segment_id(&self) -> u32 {
317 self.next_segment_id
318 }
319
320 pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
322 self.delete_bitmaps
323 .get(&segment_id)
324 .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
325 }
326
327 fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
331 if values.len() != self.schema.columns.len() {
332 return Err(ColumnarError::SchemaMismatch {
333 expected: self.schema.columns.len(),
334 got: values.len(),
335 });
336 }
337
338 if self.pk_col_indices.len() == 1 {
339 Ok(encode_pk(&values[self.pk_col_indices[0]]))
340 } else {
341 let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
342 Ok(crate::pk_index::encode_composite_pk(&pk_values))
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
350 use nodedb_types::value::Value;
351
352 use super::*;
353
354 fn test_schema() -> ColumnarSchema {
355 ColumnarSchema::new(vec![
356 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
357 ColumnDef::required("name", ColumnType::String),
358 ColumnDef::nullable("score", ColumnType::Float64),
359 ])
360 .expect("valid")
361 }
362
363 #[test]
364 fn insert_and_pk_check() {
365 let mut engine = MutationEngine::new("test".into(), test_schema());
366
367 let result = engine
368 .insert(&[
369 Value::Integer(1),
370 Value::String("Alice".into()),
371 Value::Float(0.75),
372 ])
373 .expect("insert");
374
375 assert_eq!(result.wal_records.len(), 1);
376 assert!(matches!(
377 &result.wal_records[0],
378 ColumnarWalRecord::InsertRow { .. }
379 ));
380
381 assert_eq!(engine.pk_index().len(), 1);
382 assert_eq!(engine.memtable().row_count(), 1);
383 }
384
385 #[test]
386 fn delete_by_pk() {
387 let mut engine = MutationEngine::new("test".into(), test_schema());
388
389 engine
390 .insert(&[
391 Value::Integer(1),
392 Value::String("Alice".into()),
393 Value::Null,
394 ])
395 .expect("insert");
396
397 let result = engine.delete(&Value::Integer(1)).expect("delete");
398 assert_eq!(result.wal_records.len(), 1);
399 assert!(matches!(
400 &result.wal_records[0],
401 ColumnarWalRecord::DeleteRows { .. }
402 ));
403
404 assert!(engine.pk_index().is_empty());
406 }
407
408 #[test]
409 fn delete_nonexistent_pk() {
410 let mut engine = MutationEngine::new("test".into(), test_schema());
411
412 let err = engine.delete(&Value::Integer(999));
413 assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
414 }
415
416 #[test]
417 fn update_row() {
418 let mut engine = MutationEngine::new("test".into(), test_schema());
419
420 engine
421 .insert(&[
422 Value::Integer(1),
423 Value::String("Alice".into()),
424 Value::Float(0.5),
425 ])
426 .expect("insert");
427
428 let result = engine
430 .update(
431 &Value::Integer(1),
432 &[
433 Value::Integer(1),
434 Value::String("Alice Updated".into()),
435 Value::Float(0.75),
436 ],
437 )
438 .expect("update");
439
440 assert_eq!(result.wal_records.len(), 2);
442 assert!(matches!(
443 &result.wal_records[0],
444 ColumnarWalRecord::DeleteRows { .. }
445 ));
446 assert!(matches!(
447 &result.wal_records[1],
448 ColumnarWalRecord::InsertRow { .. }
449 ));
450
451 assert_eq!(engine.pk_index().len(), 1);
453 assert_eq!(engine.memtable().row_count(), 2);
455 }
456
457 #[test]
458 fn memtable_flush_remaps_pk() {
459 let mut engine = MutationEngine::new("test".into(), test_schema());
460
461 for i in 0..5 {
462 engine
463 .insert(&[
464 Value::Integer(i),
465 Value::String(format!("u{i}")),
466 Value::Null,
467 ])
468 .expect("insert");
469 }
470
471 let result = engine.on_memtable_flushed(1);
473 assert_eq!(result.wal_records.len(), 1);
474 assert!(matches!(
475 &result.wal_records[0],
476 ColumnarWalRecord::MemtableFlushed {
477 segment_id: 1,
478 row_count: 5,
479 ..
480 }
481 ));
482
483 let pk = encode_pk(&Value::Integer(3));
485 let loc = engine.pk_index().get(&pk).expect("pk exists");
486 assert_eq!(loc.segment_id, 1);
487 assert_eq!(loc.row_index, 3);
488 }
489
490 #[test]
491 fn multiple_inserts_and_deletes() {
492 let mut engine = MutationEngine::new("test".into(), test_schema());
493
494 for i in 0..10 {
495 engine
496 .insert(&[
497 Value::Integer(i),
498 Value::String(format!("u{i}")),
499 Value::Null,
500 ])
501 .expect("insert");
502 }
503
504 for i in (1..10).step_by(2) {
506 engine.delete(&Value::Integer(i)).expect("delete");
507 }
508
509 assert_eq!(engine.pk_index().len(), 5); }
511
512 #[test]
513 fn should_compact_threshold() {
514 let mut engine = MutationEngine::new("test".into(), test_schema());
515
516 for i in 0..10 {
518 engine
519 .insert(&[
520 Value::Integer(i),
521 Value::String(format!("u{i}")),
522 Value::Null,
523 ])
524 .expect("insert");
525 }
526 engine.on_memtable_flushed(1);
527
528 for i in 0..3 {
530 engine.delete(&Value::Integer(i)).expect("delete");
531 }
532
533 assert!(engine.should_compact(1, 10));
534 }
535}