Skip to main content

nodedb_columnar/
mutation.rs

1//! Columnar mutation engine: coordinates PK index, delete bitmaps,
2//! memtable, and WAL records for full INSERT/UPDATE/DELETE.
3//!
4//! The MutationEngine is the single point of coordination for all
5//! columnar write operations. It produces WAL records that must be
6//! persisted before the mutation is considered durable.
7
8use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19/// Coordinates all columnar mutations for a single collection.
20///
21/// Owns the memtable, PK index, and per-segment delete bitmaps.
22/// Produces WAL records for each mutation that the caller must persist.
23pub struct MutationEngine {
24    collection: String,
25    schema: ColumnarSchema,
26    memtable: ColumnarMemtable,
27    pk_index: PkIndex,
28    /// Per-segment delete bitmaps. Key = segment_id.
29    delete_bitmaps: HashMap<u32, DeleteBitmap>,
30    /// PK column indices in the schema.
31    pk_col_indices: Vec<usize>,
32    /// Counter for assigning segment IDs.
33    next_segment_id: u32,
34    /// Current "memtable segment ID" — a virtual segment ID for rows
35    /// that are still in the memtable (not yet flushed).
36    memtable_segment_id: u32,
37    /// Row counter within the current memtable (resets on flush).
38    memtable_row_counter: u32,
39}
40
41/// Result of a mutation operation, including the WAL record to persist.
42pub struct MutationResult {
43    /// WAL record(s) to persist before the mutation is considered durable.
44    pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48    /// Create a new mutation engine for a collection.
49    pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50        let pk_col_indices: Vec<usize> = schema
51            .columns
52            .iter()
53            .enumerate()
54            .filter(|(_, c)| c.primary_key)
55            .map(|(i, _)| i)
56            .collect();
57
58        let memtable = ColumnarMemtable::new(&schema);
59        // Reserve segment_id 0 for the first memtable. Real segments start at 1.
60        let memtable_segment_id = 0;
61
62        Self {
63            collection,
64            schema,
65            memtable,
66            pk_index: PkIndex::new(),
67            delete_bitmaps: HashMap::new(),
68            pk_col_indices,
69            next_segment_id: 1,
70            memtable_segment_id,
71            memtable_row_counter: 0,
72        }
73    }
74
75    /// Insert a row. Returns WAL record to persist.
76    ///
77    /// Validates schema, checks PK uniqueness, adds to memtable and PK index.
78    pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
79        // Extract PK bytes for uniqueness check.
80        let pk_bytes = self.extract_pk_bytes(values)?;
81
82        // Check for duplicate PK.
83        if self.pk_index.contains(&pk_bytes) {
84            return Err(ColumnarError::DuplicatePrimaryKey);
85        }
86
87        // Generate WAL record BEFORE applying the mutation.
88        let row_data = encode_row_for_wal(values);
89        let wal = ColumnarWalRecord::InsertRow {
90            collection: self.collection.clone(),
91            row_data,
92        };
93
94        // Add to memtable.
95        self.memtable.append_row(values)?;
96
97        // Add to PK index (pointing to memtable virtual segment).
98        let location = RowLocation {
99            segment_id: self.memtable_segment_id,
100            row_index: self.memtable_row_counter,
101        };
102        self.pk_index.upsert(pk_bytes, location);
103        self.memtable_row_counter += 1;
104
105        Ok(MutationResult {
106            wal_records: vec![wal],
107        })
108    }
109
110    /// Delete a row by PK value. Returns WAL record to persist.
111    ///
112    /// Looks up PK in the index to find the segment + row, then marks
113    /// the row in the segment's delete bitmap.
114    pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
115        let pk_bytes = encode_pk(pk_value);
116
117        let location = self
118            .pk_index
119            .get(&pk_bytes)
120            .copied()
121            .ok_or(ColumnarError::PrimaryKeyNotFound)?;
122
123        // Generate WAL record BEFORE applying.
124        let wal = ColumnarWalRecord::DeleteRows {
125            collection: self.collection.clone(),
126            segment_id: location.segment_id,
127            row_indices: vec![location.row_index],
128        };
129
130        // Mark in delete bitmap.
131        let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
132        bitmap.mark_deleted(location.row_index);
133
134        // Remove from PK index.
135        self.pk_index.remove(&pk_bytes);
136
137        Ok(MutationResult {
138            wal_records: vec![wal],
139        })
140    }
141
142    /// Update a row by PK: DELETE old + INSERT new.
143    ///
144    /// `updates` maps column names to new values. Columns not in the map
145    /// retain their existing values from the old row.
146    ///
147    /// Returns WAL records for both the delete and the insert.
148    ///
149    /// NOTE: The caller must provide the full old row values for the re-insert.
150    /// This method takes the complete new row (already merged with old values).
151    pub fn update(
152        &mut self,
153        old_pk: &Value,
154        new_values: &[Value],
155    ) -> Result<MutationResult, ColumnarError> {
156        // Delete the old row.
157        let delete_result = self.delete(old_pk)?;
158
159        // Insert the new row.
160        let insert_result = self.insert(new_values)?;
161
162        // Combine WAL records.
163        let mut wal_records = delete_result.wal_records;
164        wal_records.extend(insert_result.wal_records);
165
166        Ok(MutationResult { wal_records })
167    }
168
169    /// Notify the engine that the memtable was flushed to a new segment.
170    ///
171    /// Updates the PK index to remap memtable entries to the new segment.
172    /// Returns the WAL record for the flush event.
173    pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
174        let row_count = self.memtable_row_counter;
175
176        // Remap PK index entries from virtual memtable segment to real segment.
177        self.pk_index
178            .remap_segment(self.memtable_segment_id, |old_row| {
179                Some(RowLocation {
180                    segment_id: new_segment_id,
181                    row_index: old_row,
182                })
183            });
184
185        // Reset memtable tracking.
186        self.memtable_segment_id = self.next_segment_id;
187        self.next_segment_id += 1;
188        self.memtable_row_counter = 0;
189
190        let wal = ColumnarWalRecord::MemtableFlushed {
191            collection: self.collection.clone(),
192            segment_id: new_segment_id,
193            row_count: row_count as u64,
194        };
195
196        MutationResult {
197            wal_records: vec![wal],
198        }
199    }
200
201    /// Notify the engine that compaction completed.
202    ///
203    /// Remaps PK index entries and removes old delete bitmaps.
204    pub fn on_compaction_complete(
205        &mut self,
206        old_segment_ids: &[u32],
207        new_segment_id: u32,
208        row_mapping: &HashMap<(u32, u32), u32>,
209    ) -> MutationResult {
210        // Remap PK index for each old segment.
211        for &old_seg in old_segment_ids {
212            self.pk_index.remap_segment(old_seg, |old_row| {
213                row_mapping
214                    .get(&(old_seg, old_row))
215                    .map(|&new_row| RowLocation {
216                        segment_id: new_segment_id,
217                        row_index: new_row,
218                    })
219            });
220
221            // Remove old delete bitmap.
222            self.delete_bitmaps.remove(&old_seg);
223        }
224
225        let wal = ColumnarWalRecord::CompactionCommit {
226            collection: self.collection.clone(),
227            old_segment_ids: old_segment_ids.to_vec(),
228            new_segment_ids: vec![new_segment_id],
229        };
230
231        MutationResult {
232            wal_records: vec![wal],
233        }
234    }
235
236    // -- Accessors --
237
238    /// Access the memtable.
239    pub fn memtable(&self) -> &ColumnarMemtable {
240        &self.memtable
241    }
242
243    /// Mutable access to the memtable (for drain on flush).
244    pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
245        &mut self.memtable
246    }
247
248    /// Access the PK index.
249    pub fn pk_index(&self) -> &PkIndex {
250        &self.pk_index
251    }
252
253    /// Mutable access to the PK index (for cold-start rebuild).
254    pub fn pk_index_mut(&mut self) -> &mut PkIndex {
255        &mut self.pk_index
256    }
257
258    /// Access a segment's delete bitmap.
259    pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
260        self.delete_bitmaps.get(&segment_id)
261    }
262
263    /// Access all delete bitmaps.
264    pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
265        &self.delete_bitmaps
266    }
267
268    /// The collection name.
269    pub fn collection(&self) -> &str {
270        &self.collection
271    }
272
273    /// The schema.
274    pub fn schema(&self) -> &ColumnarSchema {
275        &self.schema
276    }
277
278    /// Whether the memtable should be flushed.
279    pub fn should_flush(&self) -> bool {
280        self.memtable.should_flush()
281    }
282
283    /// Iterate non-deleted rows in the memtable as `Vec<Value>`.
284    ///
285    /// Skips rows marked as deleted in the memtable's virtual segment
286    /// delete bitmap. For rows in flushed segments, use `SegmentReader`.
287    pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
288        let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
289        self.memtable
290            .iter_rows()
291            .enumerate()
292            .filter_map(move |(row_idx, row)| {
293                if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
294                    None
295                } else {
296                    Some(row)
297                }
298            })
299    }
300
301    /// Get a single row from the memtable by index (None if deleted).
302    pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
303        if self
304            .delete_bitmaps
305            .get(&self.memtable_segment_id)
306            .is_some_and(|bm| bm.is_deleted(row_idx as u32))
307        {
308            return None;
309        }
310        self.memtable.get_row(row_idx)
311    }
312
313    /// Whether a segment should be compacted based on its delete ratio.
314    pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
315        self.delete_bitmaps
316            .get(&segment_id)
317            .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
318    }
319
320    // -- Internal helpers --
321
322    /// Extract PK bytes from a row of values.
323    fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
324        if values.len() != self.schema.columns.len() {
325            return Err(ColumnarError::SchemaMismatch {
326                expected: self.schema.columns.len(),
327                got: values.len(),
328            });
329        }
330
331        if self.pk_col_indices.len() == 1 {
332            Ok(encode_pk(&values[self.pk_col_indices[0]]))
333        } else {
334            let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
335            Ok(crate::pk_index::encode_composite_pk(&pk_values))
336        }
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
343    use nodedb_types::value::Value;
344
345    use super::*;
346
347    fn test_schema() -> ColumnarSchema {
348        ColumnarSchema::new(vec![
349            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
350            ColumnDef::required("name", ColumnType::String),
351            ColumnDef::nullable("score", ColumnType::Float64),
352        ])
353        .expect("valid")
354    }
355
356    #[test]
357    fn insert_and_pk_check() {
358        let mut engine = MutationEngine::new("test".into(), test_schema());
359
360        let result = engine
361            .insert(&[
362                Value::Integer(1),
363                Value::String("Alice".into()),
364                Value::Float(0.75),
365            ])
366            .expect("insert");
367
368        assert_eq!(result.wal_records.len(), 1);
369        assert!(matches!(
370            &result.wal_records[0],
371            ColumnarWalRecord::InsertRow { .. }
372        ));
373
374        assert_eq!(engine.pk_index().len(), 1);
375        assert_eq!(engine.memtable().row_count(), 1);
376    }
377
378    #[test]
379    fn delete_by_pk() {
380        let mut engine = MutationEngine::new("test".into(), test_schema());
381
382        engine
383            .insert(&[
384                Value::Integer(1),
385                Value::String("Alice".into()),
386                Value::Null,
387            ])
388            .expect("insert");
389
390        let result = engine.delete(&Value::Integer(1)).expect("delete");
391        assert_eq!(result.wal_records.len(), 1);
392        assert!(matches!(
393            &result.wal_records[0],
394            ColumnarWalRecord::DeleteRows { .. }
395        ));
396
397        // PK should be removed from index.
398        assert!(engine.pk_index().is_empty());
399    }
400
401    #[test]
402    fn delete_nonexistent_pk() {
403        let mut engine = MutationEngine::new("test".into(), test_schema());
404
405        let err = engine.delete(&Value::Integer(999));
406        assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
407    }
408
409    #[test]
410    fn update_row() {
411        let mut engine = MutationEngine::new("test".into(), test_schema());
412
413        engine
414            .insert(&[
415                Value::Integer(1),
416                Value::String("Alice".into()),
417                Value::Float(0.5),
418            ])
419            .expect("insert");
420
421        // Update: change name and score, keep same PK.
422        let result = engine
423            .update(
424                &Value::Integer(1),
425                &[
426                    Value::Integer(1),
427                    Value::String("Alice Updated".into()),
428                    Value::Float(0.75),
429                ],
430            )
431            .expect("update");
432
433        // Should produce 2 WAL records: delete + insert.
434        assert_eq!(result.wal_records.len(), 2);
435        assert!(matches!(
436            &result.wal_records[0],
437            ColumnarWalRecord::DeleteRows { .. }
438        ));
439        assert!(matches!(
440            &result.wal_records[1],
441            ColumnarWalRecord::InsertRow { .. }
442        ));
443
444        // PK index should still have 1 entry.
445        assert_eq!(engine.pk_index().len(), 1);
446        // Memtable should have 2 rows (original + updated).
447        assert_eq!(engine.memtable().row_count(), 2);
448    }
449
450    #[test]
451    fn memtable_flush_remaps_pk() {
452        let mut engine = MutationEngine::new("test".into(), test_schema());
453
454        for i in 0..5 {
455            engine
456                .insert(&[
457                    Value::Integer(i),
458                    Value::String(format!("u{i}")),
459                    Value::Null,
460                ])
461                .expect("insert");
462        }
463
464        // Simulate flush: memtable becomes segment 1.
465        let result = engine.on_memtable_flushed(1);
466        assert_eq!(result.wal_records.len(), 1);
467        assert!(matches!(
468            &result.wal_records[0],
469            ColumnarWalRecord::MemtableFlushed {
470                segment_id: 1,
471                row_count: 5,
472                ..
473            }
474        ));
475
476        // PK index entries should now point to segment 1.
477        let pk = encode_pk(&Value::Integer(3));
478        let loc = engine.pk_index().get(&pk).expect("pk exists");
479        assert_eq!(loc.segment_id, 1);
480        assert_eq!(loc.row_index, 3);
481    }
482
483    #[test]
484    fn multiple_inserts_and_deletes() {
485        let mut engine = MutationEngine::new("test".into(), test_schema());
486
487        for i in 0..10 {
488            engine
489                .insert(&[
490                    Value::Integer(i),
491                    Value::String(format!("u{i}")),
492                    Value::Null,
493                ])
494                .expect("insert");
495        }
496
497        // Delete odd-numbered rows.
498        for i in (1..10).step_by(2) {
499            engine.delete(&Value::Integer(i)).expect("delete");
500        }
501
502        assert_eq!(engine.pk_index().len(), 5); // 0, 2, 4, 6, 8.
503    }
504
505    #[test]
506    fn should_compact_threshold() {
507        let mut engine = MutationEngine::new("test".into(), test_schema());
508
509        // Insert and flush to create a real segment.
510        for i in 0..10 {
511            engine
512                .insert(&[
513                    Value::Integer(i),
514                    Value::String(format!("u{i}")),
515                    Value::Null,
516                ])
517                .expect("insert");
518        }
519        engine.on_memtable_flushed(1);
520
521        // Delete 3 out of 10 rows = 30% > 20% threshold.
522        for i in 0..3 {
523            engine.delete(&Value::Integer(i)).expect("delete");
524        }
525
526        assert!(engine.should_compact(1, 10));
527    }
528}