Skip to main content

nodedb_columnar/
mutation.rs

1//! Columnar mutation engine: coordinates PK index, delete bitmaps,
2//! memtable, and WAL records for full INSERT/UPDATE/DELETE.
3//!
4//! The MutationEngine is the single point of coordination for all
5//! columnar write operations. It produces WAL records that must be
6//! persisted before the mutation is considered durable.
7
8use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19/// Coordinates all columnar mutations for a single collection.
20///
21/// Owns the memtable, PK index, and per-segment delete bitmaps.
22/// Produces WAL records for each mutation that the caller must persist.
23pub struct MutationEngine {
24    collection: String,
25    schema: ColumnarSchema,
26    memtable: ColumnarMemtable,
27    pk_index: PkIndex,
28    /// Per-segment delete bitmaps. Key = segment_id.
29    delete_bitmaps: HashMap<u32, DeleteBitmap>,
30    /// PK column indices in the schema.
31    pk_col_indices: Vec<usize>,
32    /// Counter for assigning segment IDs.
33    next_segment_id: u32,
34    /// Current "memtable segment ID" — a virtual segment ID for rows
35    /// that are still in the memtable (not yet flushed).
36    memtable_segment_id: u32,
37    /// Row counter within the current memtable (resets on flush).
38    memtable_row_counter: u32,
39}
40
41/// Result of a mutation operation, including the WAL record to persist.
42pub struct MutationResult {
43    /// WAL record(s) to persist before the mutation is considered durable.
44    pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48    /// Create a new mutation engine for a collection.
49    pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50        let pk_col_indices: Vec<usize> = schema
51            .columns
52            .iter()
53            .enumerate()
54            .filter(|(_, c)| c.primary_key)
55            .map(|(i, _)| i)
56            .collect();
57
58        let memtable = ColumnarMemtable::new(&schema);
59        // Reserve segment_id 0 for the first memtable. Real segments start at 1.
60        let memtable_segment_id = 0;
61
62        Self {
63            collection,
64            schema,
65            memtable,
66            pk_index: PkIndex::new(),
67            delete_bitmaps: HashMap::new(),
68            pk_col_indices,
69            next_segment_id: 1,
70            memtable_segment_id,
71            memtable_row_counter: 0,
72        }
73    }
74
75    /// Insert a row. Returns WAL record to persist.
76    ///
77    /// Validates schema, checks PK uniqueness, adds to memtable and PK index.
78    pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
79        // Extract PK bytes for uniqueness check.
80        let pk_bytes = self.extract_pk_bytes(values)?;
81
82        // Check for duplicate PK.
83        if self.pk_index.contains(&pk_bytes) {
84            return Err(ColumnarError::DuplicatePrimaryKey);
85        }
86
87        // Generate WAL record BEFORE applying the mutation.
88        let row_data = encode_row_for_wal(values);
89        let wal = ColumnarWalRecord::InsertRow {
90            collection: self.collection.clone(),
91            row_data,
92        };
93
94        // Add to memtable.
95        self.memtable.append_row(values)?;
96
97        // Add to PK index (pointing to memtable virtual segment).
98        let location = RowLocation {
99            segment_id: self.memtable_segment_id,
100            row_index: self.memtable_row_counter,
101        };
102        self.pk_index.upsert(pk_bytes, location);
103        self.memtable_row_counter += 1;
104
105        Ok(MutationResult {
106            wal_records: vec![wal],
107        })
108    }
109
110    /// Delete a row by PK value. Returns WAL record to persist.
111    ///
112    /// Looks up PK in the index to find the segment + row, then marks
113    /// the row in the segment's delete bitmap.
114    pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
115        let pk_bytes = encode_pk(pk_value);
116
117        let location = self
118            .pk_index
119            .get(&pk_bytes)
120            .copied()
121            .ok_or(ColumnarError::PrimaryKeyNotFound)?;
122
123        // Generate WAL record BEFORE applying.
124        let wal = ColumnarWalRecord::DeleteRows {
125            collection: self.collection.clone(),
126            segment_id: location.segment_id,
127            row_indices: vec![location.row_index],
128        };
129
130        // Mark in delete bitmap.
131        let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
132        bitmap.mark_deleted(location.row_index);
133
134        // Remove from PK index.
135        self.pk_index.remove(&pk_bytes);
136
137        Ok(MutationResult {
138            wal_records: vec![wal],
139        })
140    }
141
142    /// Update a row by PK: DELETE old + INSERT new.
143    ///
144    /// `updates` maps column names to new values. Columns not in the map
145    /// retain their existing values from the old row.
146    ///
147    /// Returns WAL records for both the delete and the insert.
148    ///
149    /// NOTE: The caller must provide the full old row values for the re-insert.
150    /// This method takes the complete new row (already merged with old values).
151    pub fn update(
152        &mut self,
153        old_pk: &Value,
154        new_values: &[Value],
155    ) -> Result<MutationResult, ColumnarError> {
156        // Delete the old row.
157        let delete_result = self.delete(old_pk)?;
158
159        // Insert the new row.
160        let insert_result = self.insert(new_values)?;
161
162        // Combine WAL records.
163        let mut wal_records = delete_result.wal_records;
164        wal_records.extend(insert_result.wal_records);
165
166        Ok(MutationResult { wal_records })
167    }
168
169    /// Notify the engine that the memtable was flushed to a new segment.
170    ///
171    /// Updates the PK index to remap memtable entries to the new segment.
172    /// Returns the WAL record for the flush event.
173    pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
174        let row_count = self.memtable_row_counter;
175
176        // Remap PK index entries from virtual memtable segment to real segment.
177        self.pk_index
178            .remap_segment(self.memtable_segment_id, |old_row| {
179                Some(RowLocation {
180                    segment_id: new_segment_id,
181                    row_index: old_row,
182                })
183            });
184
185        // Reset memtable tracking.
186        self.memtable_segment_id = self.next_segment_id;
187        self.next_segment_id += 1;
188        self.memtable_row_counter = 0;
189
190        let wal = ColumnarWalRecord::MemtableFlushed {
191            collection: self.collection.clone(),
192            segment_id: new_segment_id,
193            row_count: row_count as u64,
194        };
195
196        MutationResult {
197            wal_records: vec![wal],
198        }
199    }
200
201    /// Notify the engine that compaction completed.
202    ///
203    /// Remaps PK index entries and removes old delete bitmaps.
204    pub fn on_compaction_complete(
205        &mut self,
206        old_segment_ids: &[u32],
207        new_segment_id: u32,
208        row_mapping: &HashMap<(u32, u32), u32>,
209    ) -> MutationResult {
210        // Remap PK index for each old segment.
211        for &old_seg in old_segment_ids {
212            self.pk_index.remap_segment(old_seg, |old_row| {
213                row_mapping
214                    .get(&(old_seg, old_row))
215                    .map(|&new_row| RowLocation {
216                        segment_id: new_segment_id,
217                        row_index: new_row,
218                    })
219            });
220
221            // Remove old delete bitmap.
222            self.delete_bitmaps.remove(&old_seg);
223        }
224
225        let wal = ColumnarWalRecord::CompactionCommit {
226            collection: self.collection.clone(),
227            old_segment_ids: old_segment_ids.to_vec(),
228            new_segment_ids: vec![new_segment_id],
229        };
230
231        MutationResult {
232            wal_records: vec![wal],
233        }
234    }
235
236    // -- Accessors --
237
238    /// Access the memtable.
239    pub fn memtable(&self) -> &ColumnarMemtable {
240        &self.memtable
241    }
242
243    /// Mutable access to the memtable (for drain on flush).
244    pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
245        &mut self.memtable
246    }
247
248    /// Access the PK index.
249    pub fn pk_index(&self) -> &PkIndex {
250        &self.pk_index
251    }
252
253    /// Mutable access to the PK index (for cold-start rebuild).
254    pub fn pk_index_mut(&mut self) -> &mut PkIndex {
255        &mut self.pk_index
256    }
257
258    /// Access a segment's delete bitmap.
259    pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
260        self.delete_bitmaps.get(&segment_id)
261    }
262
263    /// Access all delete bitmaps.
264    pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
265        &self.delete_bitmaps
266    }
267
268    /// The collection name.
269    pub fn collection(&self) -> &str {
270        &self.collection
271    }
272
273    /// The schema.
274    pub fn schema(&self) -> &ColumnarSchema {
275        &self.schema
276    }
277
278    /// Whether the memtable should be flushed.
279    pub fn should_flush(&self) -> bool {
280        self.memtable.should_flush()
281    }
282
283    /// Whether a segment should be compacted based on its delete ratio.
284    pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
285        self.delete_bitmaps
286            .get(&segment_id)
287            .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
288    }
289
290    // -- Internal helpers --
291
292    /// Extract PK bytes from a row of values.
293    fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
294        if values.len() != self.schema.columns.len() {
295            return Err(ColumnarError::SchemaMismatch {
296                expected: self.schema.columns.len(),
297                got: values.len(),
298            });
299        }
300
301        if self.pk_col_indices.len() == 1 {
302            Ok(encode_pk(&values[self.pk_col_indices[0]]))
303        } else {
304            let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
305            Ok(crate::pk_index::encode_composite_pk(&pk_values))
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
313    use nodedb_types::value::Value;
314
315    use super::*;
316
317    fn test_schema() -> ColumnarSchema {
318        ColumnarSchema::new(vec![
319            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
320            ColumnDef::required("name", ColumnType::String),
321            ColumnDef::nullable("score", ColumnType::Float64),
322        ])
323        .expect("valid")
324    }
325
326    #[test]
327    fn insert_and_pk_check() {
328        let mut engine = MutationEngine::new("test".into(), test_schema());
329
330        let result = engine
331            .insert(&[
332                Value::Integer(1),
333                Value::String("Alice".into()),
334                Value::Float(0.75),
335            ])
336            .expect("insert");
337
338        assert_eq!(result.wal_records.len(), 1);
339        assert!(matches!(
340            &result.wal_records[0],
341            ColumnarWalRecord::InsertRow { .. }
342        ));
343
344        assert_eq!(engine.pk_index().len(), 1);
345        assert_eq!(engine.memtable().row_count(), 1);
346    }
347
348    #[test]
349    fn delete_by_pk() {
350        let mut engine = MutationEngine::new("test".into(), test_schema());
351
352        engine
353            .insert(&[
354                Value::Integer(1),
355                Value::String("Alice".into()),
356                Value::Null,
357            ])
358            .expect("insert");
359
360        let result = engine.delete(&Value::Integer(1)).expect("delete");
361        assert_eq!(result.wal_records.len(), 1);
362        assert!(matches!(
363            &result.wal_records[0],
364            ColumnarWalRecord::DeleteRows { .. }
365        ));
366
367        // PK should be removed from index.
368        assert!(engine.pk_index().is_empty());
369    }
370
371    #[test]
372    fn delete_nonexistent_pk() {
373        let mut engine = MutationEngine::new("test".into(), test_schema());
374
375        let err = engine.delete(&Value::Integer(999));
376        assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
377    }
378
379    #[test]
380    fn update_row() {
381        let mut engine = MutationEngine::new("test".into(), test_schema());
382
383        engine
384            .insert(&[
385                Value::Integer(1),
386                Value::String("Alice".into()),
387                Value::Float(0.5),
388            ])
389            .expect("insert");
390
391        // Update: change name and score, keep same PK.
392        let result = engine
393            .update(
394                &Value::Integer(1),
395                &[
396                    Value::Integer(1),
397                    Value::String("Alice Updated".into()),
398                    Value::Float(0.75),
399                ],
400            )
401            .expect("update");
402
403        // Should produce 2 WAL records: delete + insert.
404        assert_eq!(result.wal_records.len(), 2);
405        assert!(matches!(
406            &result.wal_records[0],
407            ColumnarWalRecord::DeleteRows { .. }
408        ));
409        assert!(matches!(
410            &result.wal_records[1],
411            ColumnarWalRecord::InsertRow { .. }
412        ));
413
414        // PK index should still have 1 entry.
415        assert_eq!(engine.pk_index().len(), 1);
416        // Memtable should have 2 rows (original + updated).
417        assert_eq!(engine.memtable().row_count(), 2);
418    }
419
420    #[test]
421    fn memtable_flush_remaps_pk() {
422        let mut engine = MutationEngine::new("test".into(), test_schema());
423
424        for i in 0..5 {
425            engine
426                .insert(&[
427                    Value::Integer(i),
428                    Value::String(format!("u{i}")),
429                    Value::Null,
430                ])
431                .expect("insert");
432        }
433
434        // Simulate flush: memtable becomes segment 1.
435        let result = engine.on_memtable_flushed(1);
436        assert_eq!(result.wal_records.len(), 1);
437        assert!(matches!(
438            &result.wal_records[0],
439            ColumnarWalRecord::MemtableFlushed {
440                segment_id: 1,
441                row_count: 5,
442                ..
443            }
444        ));
445
446        // PK index entries should now point to segment 1.
447        let pk = encode_pk(&Value::Integer(3));
448        let loc = engine.pk_index().get(&pk).expect("pk exists");
449        assert_eq!(loc.segment_id, 1);
450        assert_eq!(loc.row_index, 3);
451    }
452
453    #[test]
454    fn multiple_inserts_and_deletes() {
455        let mut engine = MutationEngine::new("test".into(), test_schema());
456
457        for i in 0..10 {
458            engine
459                .insert(&[
460                    Value::Integer(i),
461                    Value::String(format!("u{i}")),
462                    Value::Null,
463                ])
464                .expect("insert");
465        }
466
467        // Delete odd-numbered rows.
468        for i in (1..10).step_by(2) {
469            engine.delete(&Value::Integer(i)).expect("delete");
470        }
471
472        assert_eq!(engine.pk_index().len(), 5); // 0, 2, 4, 6, 8.
473    }
474
475    #[test]
476    fn should_compact_threshold() {
477        let mut engine = MutationEngine::new("test".into(), test_schema());
478
479        // Insert and flush to create a real segment.
480        for i in 0..10 {
481            engine
482                .insert(&[
483                    Value::Integer(i),
484                    Value::String(format!("u{i}")),
485                    Value::Null,
486                ])
487                .expect("insert");
488        }
489        engine.on_memtable_flushed(1);
490
491        // Delete 3 out of 10 rows = 30% > 20% threshold.
492        for i in 0..3 {
493            engine.delete(&Value::Integer(i)).expect("delete");
494        }
495
496        assert!(engine.should_compact(1, 10));
497    }
498}