1use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19pub struct MutationEngine {
24 collection: String,
25 schema: ColumnarSchema,
26 memtable: ColumnarMemtable,
27 pk_index: PkIndex,
28 delete_bitmaps: HashMap<u32, DeleteBitmap>,
30 pk_col_indices: Vec<usize>,
32 next_segment_id: u32,
34 memtable_segment_id: u32,
37 memtable_row_counter: u32,
39}
40
41pub struct MutationResult {
43 pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48 pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50 let pk_col_indices: Vec<usize> = schema
51 .columns
52 .iter()
53 .enumerate()
54 .filter(|(_, c)| c.primary_key)
55 .map(|(i, _)| i)
56 .collect();
57
58 let memtable = ColumnarMemtable::new(&schema);
59 let memtable_segment_id = 0;
61
62 Self {
63 collection,
64 schema,
65 memtable,
66 pk_index: PkIndex::new(),
67 delete_bitmaps: HashMap::new(),
68 pk_col_indices,
69 next_segment_id: 1,
70 memtable_segment_id,
71 memtable_row_counter: 0,
72 }
73 }
74
75 pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
91 let pk_bytes = self.extract_pk_bytes(values)?;
92 let mut wal_records = Vec::with_capacity(2);
93
94 if let Some(prior) = self.pk_index.get(&pk_bytes).copied() {
98 let bitmap = self.delete_bitmaps.entry(prior.segment_id).or_default();
99 bitmap.mark_deleted(prior.row_index);
100 wal_records.push(ColumnarWalRecord::DeleteRows {
101 collection: self.collection.clone(),
102 segment_id: prior.segment_id,
103 row_indices: vec![prior.row_index],
104 });
105 }
106
107 let row_data = encode_row_for_wal(values)?;
108 wal_records.push(ColumnarWalRecord::InsertRow {
109 collection: self.collection.clone(),
110 row_data,
111 });
112
113 self.memtable.append_row(values)?;
114 let location = RowLocation {
115 segment_id: self.memtable_segment_id,
116 row_index: self.memtable_row_counter,
117 };
118 self.pk_index.upsert(pk_bytes, location);
119 self.memtable_row_counter += 1;
120
121 Ok(MutationResult { wal_records })
122 }
123
124 pub fn insert_if_absent(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
131 let pk_bytes = self.extract_pk_bytes(values)?;
132 if self.pk_index.contains(&pk_bytes) {
133 return Ok(MutationResult {
134 wal_records: Vec::new(),
135 });
136 }
137
138 let row_data = encode_row_for_wal(values)?;
139 let wal = ColumnarWalRecord::InsertRow {
140 collection: self.collection.clone(),
141 row_data,
142 };
143 self.memtable.append_row(values)?;
144 let location = RowLocation {
145 segment_id: self.memtable_segment_id,
146 row_index: self.memtable_row_counter,
147 };
148 self.pk_index.upsert(pk_bytes, location);
149 self.memtable_row_counter += 1;
150
151 Ok(MutationResult {
152 wal_records: vec![wal],
153 })
154 }
155
156 pub fn lookup_memtable_row_by_pk(&self, pk_bytes: &[u8]) -> Option<Vec<Value>> {
165 let loc = self.pk_index.get(pk_bytes).copied()?;
166 if loc.segment_id != self.memtable_segment_id {
167 return None;
168 }
169 self.memtable.get_row(loc.row_index as usize)
170 }
171
172 pub fn encode_pk_from_row(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
175 self.extract_pk_bytes(values)
176 }
177
178 pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
183 let pk_bytes = encode_pk(pk_value);
184
185 let location = self
186 .pk_index
187 .get(&pk_bytes)
188 .copied()
189 .ok_or(ColumnarError::PrimaryKeyNotFound)?;
190
191 let wal = ColumnarWalRecord::DeleteRows {
193 collection: self.collection.clone(),
194 segment_id: location.segment_id,
195 row_indices: vec![location.row_index],
196 };
197
198 let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
200 bitmap.mark_deleted(location.row_index);
201
202 self.pk_index.remove(&pk_bytes);
204
205 Ok(MutationResult {
206 wal_records: vec![wal],
207 })
208 }
209
210 pub fn update(
220 &mut self,
221 old_pk: &Value,
222 new_values: &[Value],
223 ) -> Result<MutationResult, ColumnarError> {
224 let delete_result = self.delete(old_pk)?;
226
227 let insert_result = self.insert(new_values)?;
229
230 let mut wal_records = delete_result.wal_records;
232 wal_records.extend(insert_result.wal_records);
233
234 Ok(MutationResult { wal_records })
235 }
236
237 pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
242 let row_count = self.memtable_row_counter;
243
244 self.pk_index
246 .remap_segment(self.memtable_segment_id, |old_row| {
247 Some(RowLocation {
248 segment_id: new_segment_id,
249 row_index: old_row,
250 })
251 });
252
253 self.memtable_segment_id = self.next_segment_id;
255 self.next_segment_id += 1;
256 self.memtable_row_counter = 0;
257
258 let wal = ColumnarWalRecord::MemtableFlushed {
259 collection: self.collection.clone(),
260 segment_id: new_segment_id,
261 row_count: row_count as u64,
262 };
263
264 MutationResult {
265 wal_records: vec![wal],
266 }
267 }
268
269 pub fn on_compaction_complete(
273 &mut self,
274 old_segment_ids: &[u32],
275 new_segment_id: u32,
276 row_mapping: &HashMap<(u32, u32), u32>,
277 ) -> MutationResult {
278 for &old_seg in old_segment_ids {
280 self.pk_index.remap_segment(old_seg, |old_row| {
281 row_mapping
282 .get(&(old_seg, old_row))
283 .map(|&new_row| RowLocation {
284 segment_id: new_segment_id,
285 row_index: new_row,
286 })
287 });
288
289 self.delete_bitmaps.remove(&old_seg);
291 }
292
293 let wal = ColumnarWalRecord::CompactionCommit {
294 collection: self.collection.clone(),
295 old_segment_ids: old_segment_ids.to_vec(),
296 new_segment_ids: vec![new_segment_id],
297 };
298
299 MutationResult {
300 wal_records: vec![wal],
301 }
302 }
303
304 pub fn memtable(&self) -> &ColumnarMemtable {
308 &self.memtable
309 }
310
311 pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
313 &mut self.memtable
314 }
315
316 pub fn pk_index(&self) -> &PkIndex {
318 &self.pk_index
319 }
320
321 pub fn pk_index_mut(&mut self) -> &mut PkIndex {
323 &mut self.pk_index
324 }
325
326 pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
328 self.delete_bitmaps.get(&segment_id)
329 }
330
331 pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
333 &self.delete_bitmaps
334 }
335
336 pub fn collection(&self) -> &str {
338 &self.collection
339 }
340
341 pub fn schema(&self) -> &ColumnarSchema {
343 &self.schema
344 }
345
346 pub fn should_flush(&self) -> bool {
348 self.memtable.should_flush()
349 }
350
351 pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
356 let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
357 self.memtable
358 .iter_rows()
359 .enumerate()
360 .filter_map(move |(row_idx, row)| {
361 if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
362 None
363 } else {
364 Some(row)
365 }
366 })
367 }
368
369 pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
371 if self
372 .delete_bitmaps
373 .get(&self.memtable_segment_id)
374 .is_some_and(|bm| bm.is_deleted(row_idx as u32))
375 {
376 return None;
377 }
378 self.memtable.get_row(row_idx)
379 }
380
381 pub fn next_segment_id(&self) -> u32 {
385 self.next_segment_id
386 }
387
388 pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
390 self.delete_bitmaps
391 .get(&segment_id)
392 .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
393 }
394
395 fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
399 if values.len() != self.schema.columns.len() {
400 return Err(ColumnarError::SchemaMismatch {
401 expected: self.schema.columns.len(),
402 got: values.len(),
403 });
404 }
405
406 if self.pk_col_indices.len() == 1 {
407 Ok(encode_pk(&values[self.pk_col_indices[0]]))
408 } else {
409 let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
410 Ok(crate::pk_index::encode_composite_pk(&pk_values))
411 }
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
418 use nodedb_types::value::Value;
419
420 use super::*;
421
422 fn test_schema() -> ColumnarSchema {
423 ColumnarSchema::new(vec![
424 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
425 ColumnDef::required("name", ColumnType::String),
426 ColumnDef::nullable("score", ColumnType::Float64),
427 ])
428 .expect("valid")
429 }
430
431 #[test]
432 fn insert_and_pk_check() {
433 let mut engine = MutationEngine::new("test".into(), test_schema());
434
435 let result = engine
436 .insert(&[
437 Value::Integer(1),
438 Value::String("Alice".into()),
439 Value::Float(0.75),
440 ])
441 .expect("insert");
442
443 assert_eq!(result.wal_records.len(), 1);
444 assert!(matches!(
445 &result.wal_records[0],
446 ColumnarWalRecord::InsertRow { .. }
447 ));
448
449 assert_eq!(engine.pk_index().len(), 1);
450 assert_eq!(engine.memtable().row_count(), 1);
451 }
452
453 #[test]
454 fn delete_by_pk() {
455 let mut engine = MutationEngine::new("test".into(), test_schema());
456
457 engine
458 .insert(&[
459 Value::Integer(1),
460 Value::String("Alice".into()),
461 Value::Null,
462 ])
463 .expect("insert");
464
465 let result = engine.delete(&Value::Integer(1)).expect("delete");
466 assert_eq!(result.wal_records.len(), 1);
467 assert!(matches!(
468 &result.wal_records[0],
469 ColumnarWalRecord::DeleteRows { .. }
470 ));
471
472 assert!(engine.pk_index().is_empty());
474 }
475
476 #[test]
477 fn delete_nonexistent_pk() {
478 let mut engine = MutationEngine::new("test".into(), test_schema());
479
480 let err = engine.delete(&Value::Integer(999));
481 assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
482 }
483
484 #[test]
485 fn update_row() {
486 let mut engine = MutationEngine::new("test".into(), test_schema());
487
488 engine
489 .insert(&[
490 Value::Integer(1),
491 Value::String("Alice".into()),
492 Value::Float(0.5),
493 ])
494 .expect("insert");
495
496 let result = engine
498 .update(
499 &Value::Integer(1),
500 &[
501 Value::Integer(1),
502 Value::String("Alice Updated".into()),
503 Value::Float(0.75),
504 ],
505 )
506 .expect("update");
507
508 assert_eq!(result.wal_records.len(), 2);
510 assert!(matches!(
511 &result.wal_records[0],
512 ColumnarWalRecord::DeleteRows { .. }
513 ));
514 assert!(matches!(
515 &result.wal_records[1],
516 ColumnarWalRecord::InsertRow { .. }
517 ));
518
519 assert_eq!(engine.pk_index().len(), 1);
521 assert_eq!(engine.memtable().row_count(), 2);
523 }
524
525 #[test]
526 fn memtable_flush_remaps_pk() {
527 let mut engine = MutationEngine::new("test".into(), test_schema());
528
529 for i in 0..5 {
530 engine
531 .insert(&[
532 Value::Integer(i),
533 Value::String(format!("u{i}")),
534 Value::Null,
535 ])
536 .expect("insert");
537 }
538
539 let result = engine.on_memtable_flushed(1);
541 assert_eq!(result.wal_records.len(), 1);
542 assert!(matches!(
543 &result.wal_records[0],
544 ColumnarWalRecord::MemtableFlushed {
545 segment_id: 1,
546 row_count: 5,
547 ..
548 }
549 ));
550
551 let pk = encode_pk(&Value::Integer(3));
553 let loc = engine.pk_index().get(&pk).expect("pk exists");
554 assert_eq!(loc.segment_id, 1);
555 assert_eq!(loc.row_index, 3);
556 }
557
558 #[test]
559 fn multiple_inserts_and_deletes() {
560 let mut engine = MutationEngine::new("test".into(), test_schema());
561
562 for i in 0..10 {
563 engine
564 .insert(&[
565 Value::Integer(i),
566 Value::String(format!("u{i}")),
567 Value::Null,
568 ])
569 .expect("insert");
570 }
571
572 for i in (1..10).step_by(2) {
574 engine.delete(&Value::Integer(i)).expect("delete");
575 }
576
577 assert_eq!(engine.pk_index().len(), 5); }
579
580 #[test]
581 fn should_compact_threshold() {
582 let mut engine = MutationEngine::new("test".into(), test_schema());
583
584 for i in 0..10 {
586 engine
587 .insert(&[
588 Value::Integer(i),
589 Value::String(format!("u{i}")),
590 Value::Null,
591 ])
592 .expect("insert");
593 }
594 engine.on_memtable_flushed(1);
595
596 for i in 0..3 {
598 engine.delete(&Value::Integer(i)).expect("delete");
599 }
600
601 assert!(engine.should_compact(1, 10));
602 }
603}