Skip to main content

nodedb_columnar/
mutation.rs

1//! Columnar mutation engine: coordinates PK index, delete bitmaps,
2//! memtable, and WAL records for full INSERT/UPDATE/DELETE.
3//!
4//! The MutationEngine is the single point of coordination for all
5//! columnar write operations. It produces WAL records that must be
6//! persisted before the mutation is considered durable.
7
8use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19/// Coordinates all columnar mutations for a single collection.
20///
21/// Owns the memtable, PK index, and per-segment delete bitmaps.
22/// Produces WAL records for each mutation that the caller must persist.
23pub struct MutationEngine {
24    collection: String,
25    schema: ColumnarSchema,
26    memtable: ColumnarMemtable,
27    pk_index: PkIndex,
28    /// Per-segment delete bitmaps. Key = segment_id.
29    delete_bitmaps: HashMap<u32, DeleteBitmap>,
30    /// PK column indices in the schema.
31    pk_col_indices: Vec<usize>,
32    /// Counter for assigning segment IDs.
33    next_segment_id: u32,
34    /// Current "memtable segment ID" — a virtual segment ID for rows
35    /// that are still in the memtable (not yet flushed).
36    memtable_segment_id: u32,
37    /// Row counter within the current memtable (resets on flush).
38    memtable_row_counter: u32,
39}
40
41/// Result of a mutation operation, including the WAL record to persist.
42pub struct MutationResult {
43    /// WAL record(s) to persist before the mutation is considered durable.
44    pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48    /// Create a new mutation engine for a collection.
49    pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50        let pk_col_indices: Vec<usize> = schema
51            .columns
52            .iter()
53            .enumerate()
54            .filter(|(_, c)| c.primary_key)
55            .map(|(i, _)| i)
56            .collect();
57
58        let memtable = ColumnarMemtable::new(&schema);
59        // Reserve segment_id 0 for the first memtable. Real segments start at 1.
60        let memtable_segment_id = 0;
61
62        Self {
63            collection,
64            schema,
65            memtable,
66            pk_index: PkIndex::new(),
67            delete_bitmaps: HashMap::new(),
68            pk_col_indices,
69            next_segment_id: 1,
70            memtable_segment_id,
71            memtable_row_counter: 0,
72        }
73    }
74
75    /// Insert a row. Returns WAL record to persist.
76    ///
77    /// Validates schema, checks PK uniqueness, adds to memtable and PK index.
78    pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
79        // Extract PK bytes for uniqueness check.
80        let pk_bytes = self.extract_pk_bytes(values)?;
81
82        // Check for duplicate PK.
83        if self.pk_index.contains(&pk_bytes) {
84            return Err(ColumnarError::DuplicatePrimaryKey);
85        }
86
87        // Generate WAL record BEFORE applying the mutation.
88        let row_data = encode_row_for_wal(values)?;
89        let wal = ColumnarWalRecord::InsertRow {
90            collection: self.collection.clone(),
91            row_data,
92        };
93
94        // Add to memtable.
95        self.memtable.append_row(values)?;
96
97        // Add to PK index (pointing to memtable virtual segment).
98        let location = RowLocation {
99            segment_id: self.memtable_segment_id,
100            row_index: self.memtable_row_counter,
101        };
102        self.pk_index.upsert(pk_bytes, location);
103        self.memtable_row_counter += 1;
104
105        Ok(MutationResult {
106            wal_records: vec![wal],
107        })
108    }
109
110    /// Delete a row by PK value. Returns WAL record to persist.
111    ///
112    /// Looks up PK in the index to find the segment + row, then marks
113    /// the row in the segment's delete bitmap.
114    pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
115        let pk_bytes = encode_pk(pk_value);
116
117        let location = self
118            .pk_index
119            .get(&pk_bytes)
120            .copied()
121            .ok_or(ColumnarError::PrimaryKeyNotFound)?;
122
123        // Generate WAL record BEFORE applying.
124        let wal = ColumnarWalRecord::DeleteRows {
125            collection: self.collection.clone(),
126            segment_id: location.segment_id,
127            row_indices: vec![location.row_index],
128        };
129
130        // Mark in delete bitmap.
131        let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
132        bitmap.mark_deleted(location.row_index);
133
134        // Remove from PK index.
135        self.pk_index.remove(&pk_bytes);
136
137        Ok(MutationResult {
138            wal_records: vec![wal],
139        })
140    }
141
142    /// Update a row by PK: DELETE old + INSERT new.
143    ///
144    /// `updates` maps column names to new values. Columns not in the map
145    /// retain their existing values from the old row.
146    ///
147    /// Returns WAL records for both the delete and the insert.
148    ///
149    /// NOTE: The caller must provide the full old row values for the re-insert.
150    /// This method takes the complete new row (already merged with old values).
151    pub fn update(
152        &mut self,
153        old_pk: &Value,
154        new_values: &[Value],
155    ) -> Result<MutationResult, ColumnarError> {
156        // Delete the old row.
157        let delete_result = self.delete(old_pk)?;
158
159        // Insert the new row.
160        let insert_result = self.insert(new_values)?;
161
162        // Combine WAL records.
163        let mut wal_records = delete_result.wal_records;
164        wal_records.extend(insert_result.wal_records);
165
166        Ok(MutationResult { wal_records })
167    }
168
169    /// Notify the engine that the memtable was flushed to a new segment.
170    ///
171    /// Updates the PK index to remap memtable entries to the new segment.
172    /// Returns the WAL record for the flush event.
173    pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
174        let row_count = self.memtable_row_counter;
175
176        // Remap PK index entries from virtual memtable segment to real segment.
177        self.pk_index
178            .remap_segment(self.memtable_segment_id, |old_row| {
179                Some(RowLocation {
180                    segment_id: new_segment_id,
181                    row_index: old_row,
182                })
183            });
184
185        // Reset memtable tracking.
186        self.memtable_segment_id = self.next_segment_id;
187        self.next_segment_id += 1;
188        self.memtable_row_counter = 0;
189
190        let wal = ColumnarWalRecord::MemtableFlushed {
191            collection: self.collection.clone(),
192            segment_id: new_segment_id,
193            row_count: row_count as u64,
194        };
195
196        MutationResult {
197            wal_records: vec![wal],
198        }
199    }
200
201    /// Notify the engine that compaction completed.
202    ///
203    /// Remaps PK index entries and removes old delete bitmaps.
204    pub fn on_compaction_complete(
205        &mut self,
206        old_segment_ids: &[u32],
207        new_segment_id: u32,
208        row_mapping: &HashMap<(u32, u32), u32>,
209    ) -> MutationResult {
210        // Remap PK index for each old segment.
211        for &old_seg in old_segment_ids {
212            self.pk_index.remap_segment(old_seg, |old_row| {
213                row_mapping
214                    .get(&(old_seg, old_row))
215                    .map(|&new_row| RowLocation {
216                        segment_id: new_segment_id,
217                        row_index: new_row,
218                    })
219            });
220
221            // Remove old delete bitmap.
222            self.delete_bitmaps.remove(&old_seg);
223        }
224
225        let wal = ColumnarWalRecord::CompactionCommit {
226            collection: self.collection.clone(),
227            old_segment_ids: old_segment_ids.to_vec(),
228            new_segment_ids: vec![new_segment_id],
229        };
230
231        MutationResult {
232            wal_records: vec![wal],
233        }
234    }
235
236    // -- Accessors --
237
238    /// Access the memtable.
239    pub fn memtable(&self) -> &ColumnarMemtable {
240        &self.memtable
241    }
242
243    /// Mutable access to the memtable (for drain on flush).
244    pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
245        &mut self.memtable
246    }
247
248    /// Access the PK index.
249    pub fn pk_index(&self) -> &PkIndex {
250        &self.pk_index
251    }
252
253    /// Mutable access to the PK index (for cold-start rebuild).
254    pub fn pk_index_mut(&mut self) -> &mut PkIndex {
255        &mut self.pk_index
256    }
257
258    /// Access a segment's delete bitmap.
259    pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
260        self.delete_bitmaps.get(&segment_id)
261    }
262
263    /// Access all delete bitmaps.
264    pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
265        &self.delete_bitmaps
266    }
267
268    /// The collection name.
269    pub fn collection(&self) -> &str {
270        &self.collection
271    }
272
273    /// The schema.
274    pub fn schema(&self) -> &ColumnarSchema {
275        &self.schema
276    }
277
278    /// Whether the memtable should be flushed.
279    pub fn should_flush(&self) -> bool {
280        self.memtable.should_flush()
281    }
282
283    /// Iterate non-deleted rows in the memtable as `Vec<Value>`.
284    ///
285    /// Skips rows marked as deleted in the memtable's virtual segment
286    /// delete bitmap. For rows in flushed segments, use `SegmentReader`.
287    pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
288        let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
289        self.memtable
290            .iter_rows()
291            .enumerate()
292            .filter_map(move |(row_idx, row)| {
293                if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
294                    None
295                } else {
296                    Some(row)
297                }
298            })
299    }
300
301    /// Get a single row from the memtable by index (None if deleted).
302    pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
303        if self
304            .delete_bitmaps
305            .get(&self.memtable_segment_id)
306            .is_some_and(|bm| bm.is_deleted(row_idx as u32))
307        {
308            return None;
309        }
310        self.memtable.get_row(row_idx)
311    }
312
313    /// The segment ID that will be assigned to the next flushed segment.
314    ///
315    /// Use this to obtain the ID to pass to `on_memtable_flushed`.
316    pub fn next_segment_id(&self) -> u32 {
317        self.next_segment_id
318    }
319
320    /// Whether a segment should be compacted based on its delete ratio.
321    pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
322        self.delete_bitmaps
323            .get(&segment_id)
324            .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
325    }
326
327    // -- Internal helpers --
328
329    /// Extract PK bytes from a row of values.
330    fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
331        if values.len() != self.schema.columns.len() {
332            return Err(ColumnarError::SchemaMismatch {
333                expected: self.schema.columns.len(),
334                got: values.len(),
335            });
336        }
337
338        if self.pk_col_indices.len() == 1 {
339            Ok(encode_pk(&values[self.pk_col_indices[0]]))
340        } else {
341            let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
342            Ok(crate::pk_index::encode_composite_pk(&pk_values))
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
350    use nodedb_types::value::Value;
351
352    use super::*;
353
354    fn test_schema() -> ColumnarSchema {
355        ColumnarSchema::new(vec![
356            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
357            ColumnDef::required("name", ColumnType::String),
358            ColumnDef::nullable("score", ColumnType::Float64),
359        ])
360        .expect("valid")
361    }
362
363    #[test]
364    fn insert_and_pk_check() {
365        let mut engine = MutationEngine::new("test".into(), test_schema());
366
367        let result = engine
368            .insert(&[
369                Value::Integer(1),
370                Value::String("Alice".into()),
371                Value::Float(0.75),
372            ])
373            .expect("insert");
374
375        assert_eq!(result.wal_records.len(), 1);
376        assert!(matches!(
377            &result.wal_records[0],
378            ColumnarWalRecord::InsertRow { .. }
379        ));
380
381        assert_eq!(engine.pk_index().len(), 1);
382        assert_eq!(engine.memtable().row_count(), 1);
383    }
384
385    #[test]
386    fn delete_by_pk() {
387        let mut engine = MutationEngine::new("test".into(), test_schema());
388
389        engine
390            .insert(&[
391                Value::Integer(1),
392                Value::String("Alice".into()),
393                Value::Null,
394            ])
395            .expect("insert");
396
397        let result = engine.delete(&Value::Integer(1)).expect("delete");
398        assert_eq!(result.wal_records.len(), 1);
399        assert!(matches!(
400            &result.wal_records[0],
401            ColumnarWalRecord::DeleteRows { .. }
402        ));
403
404        // PK should be removed from index.
405        assert!(engine.pk_index().is_empty());
406    }
407
408    #[test]
409    fn delete_nonexistent_pk() {
410        let mut engine = MutationEngine::new("test".into(), test_schema());
411
412        let err = engine.delete(&Value::Integer(999));
413        assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
414    }
415
416    #[test]
417    fn update_row() {
418        let mut engine = MutationEngine::new("test".into(), test_schema());
419
420        engine
421            .insert(&[
422                Value::Integer(1),
423                Value::String("Alice".into()),
424                Value::Float(0.5),
425            ])
426            .expect("insert");
427
428        // Update: change name and score, keep same PK.
429        let result = engine
430            .update(
431                &Value::Integer(1),
432                &[
433                    Value::Integer(1),
434                    Value::String("Alice Updated".into()),
435                    Value::Float(0.75),
436                ],
437            )
438            .expect("update");
439
440        // Should produce 2 WAL records: delete + insert.
441        assert_eq!(result.wal_records.len(), 2);
442        assert!(matches!(
443            &result.wal_records[0],
444            ColumnarWalRecord::DeleteRows { .. }
445        ));
446        assert!(matches!(
447            &result.wal_records[1],
448            ColumnarWalRecord::InsertRow { .. }
449        ));
450
451        // PK index should still have 1 entry.
452        assert_eq!(engine.pk_index().len(), 1);
453        // Memtable should have 2 rows (original + updated).
454        assert_eq!(engine.memtable().row_count(), 2);
455    }
456
457    #[test]
458    fn memtable_flush_remaps_pk() {
459        let mut engine = MutationEngine::new("test".into(), test_schema());
460
461        for i in 0..5 {
462            engine
463                .insert(&[
464                    Value::Integer(i),
465                    Value::String(format!("u{i}")),
466                    Value::Null,
467                ])
468                .expect("insert");
469        }
470
471        // Simulate flush: memtable becomes segment 1.
472        let result = engine.on_memtable_flushed(1);
473        assert_eq!(result.wal_records.len(), 1);
474        assert!(matches!(
475            &result.wal_records[0],
476            ColumnarWalRecord::MemtableFlushed {
477                segment_id: 1,
478                row_count: 5,
479                ..
480            }
481        ));
482
483        // PK index entries should now point to segment 1.
484        let pk = encode_pk(&Value::Integer(3));
485        let loc = engine.pk_index().get(&pk).expect("pk exists");
486        assert_eq!(loc.segment_id, 1);
487        assert_eq!(loc.row_index, 3);
488    }
489
490    #[test]
491    fn multiple_inserts_and_deletes() {
492        let mut engine = MutationEngine::new("test".into(), test_schema());
493
494        for i in 0..10 {
495            engine
496                .insert(&[
497                    Value::Integer(i),
498                    Value::String(format!("u{i}")),
499                    Value::Null,
500                ])
501                .expect("insert");
502        }
503
504        // Delete odd-numbered rows.
505        for i in (1..10).step_by(2) {
506            engine.delete(&Value::Integer(i)).expect("delete");
507        }
508
509        assert_eq!(engine.pk_index().len(), 5); // 0, 2, 4, 6, 8.
510    }
511
512    #[test]
513    fn should_compact_threshold() {
514        let mut engine = MutationEngine::new("test".into(), test_schema());
515
516        // Insert and flush to create a real segment.
517        for i in 0..10 {
518            engine
519                .insert(&[
520                    Value::Integer(i),
521                    Value::String(format!("u{i}")),
522                    Value::Null,
523                ])
524                .expect("insert");
525        }
526        engine.on_memtable_flushed(1);
527
528        // Delete 3 out of 10 rows = 30% > 20% threshold.
529        for i in 0..3 {
530            engine.delete(&Value::Integer(i)).expect("delete");
531        }
532
533        assert!(engine.should_compact(1, 10));
534    }
535}