Skip to main content

nodedb_columnar/
mutation.rs

1//! Columnar mutation engine: coordinates PK index, delete bitmaps,
2//! memtable, and WAL records for full INSERT/UPDATE/DELETE.
3//!
4//! The MutationEngine is the single point of coordination for all
5//! columnar write operations. It produces WAL records that must be
6//! persisted before the mutation is considered durable.
7
8use std::collections::HashMap;
9
10use nodedb_types::columnar::ColumnarSchema;
11use nodedb_types::value::Value;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::pk_index::{PkIndex, RowLocation, encode_pk};
17use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
18
19/// Coordinates all columnar mutations for a single collection.
20///
21/// Owns the memtable, PK index, and per-segment delete bitmaps.
22/// Produces WAL records for each mutation that the caller must persist.
23pub struct MutationEngine {
24    collection: String,
25    schema: ColumnarSchema,
26    memtable: ColumnarMemtable,
27    pk_index: PkIndex,
28    /// Per-segment delete bitmaps. Key = segment_id.
29    delete_bitmaps: HashMap<u32, DeleteBitmap>,
30    /// PK column indices in the schema.
31    pk_col_indices: Vec<usize>,
32    /// Counter for assigning segment IDs.
33    next_segment_id: u32,
34    /// Current "memtable segment ID" — a virtual segment ID for rows
35    /// that are still in the memtable (not yet flushed).
36    memtable_segment_id: u32,
37    /// Row counter within the current memtable (resets on flush).
38    memtable_row_counter: u32,
39}
40
41/// Result of a mutation operation, including the WAL record to persist.
42pub struct MutationResult {
43    /// WAL record(s) to persist before the mutation is considered durable.
44    pub wal_records: Vec<ColumnarWalRecord>,
45}
46
47impl MutationEngine {
48    /// Create a new mutation engine for a collection.
49    pub fn new(collection: String, schema: ColumnarSchema) -> Self {
50        let pk_col_indices: Vec<usize> = schema
51            .columns
52            .iter()
53            .enumerate()
54            .filter(|(_, c)| c.primary_key)
55            .map(|(i, _)| i)
56            .collect();
57
58        let memtable = ColumnarMemtable::new(&schema);
59        // Reserve segment_id 0 for the first memtable. Real segments start at 1.
60        let memtable_segment_id = 0;
61
62        Self {
63            collection,
64            schema,
65            memtable,
66            pk_index: PkIndex::new(),
67            delete_bitmaps: HashMap::new(),
68            pk_col_indices,
69            next_segment_id: 1,
70            memtable_segment_id,
71            memtable_row_counter: 0,
72        }
73    }
74
75    /// Insert a row with upsert-on-duplicate semantics. Returns WAL
76    /// records to persist.
77    ///
78    /// Validates schema. If the PK already exists, the prior row is
79    /// tombstoned via the segment's delete bitmap (a single positional
80    /// delete) before the new row is appended to the memtable. The PK
81    /// index is rebound to the new row location. This matches the
82    /// ClickHouse / Iceberg "sparse PK + positional delete" model and
83    /// keeps `SELECT WHERE pk = X` linearizable on one row without a
84    /// read-time merge pass.
85    ///
86    /// Callers that want strict INSERT (error on duplicate) should check
87    /// `pk_index().contains()` themselves before calling; callers that
88    /// want `ON CONFLICT DO NOTHING` semantics should use
89    /// [`Self::insert_if_absent`].
90    pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
91        let pk_bytes = self.extract_pk_bytes(values)?;
92        let mut wal_records = Vec::with_capacity(2);
93
94        // If a prior row exists for this PK, tombstone it in place so
95        // subsequent scans skip the stale row. The PK index is rebound
96        // below to the freshly-appended row.
97        if let Some(prior) = self.pk_index.get(&pk_bytes).copied() {
98            let bitmap = self.delete_bitmaps.entry(prior.segment_id).or_default();
99            bitmap.mark_deleted(prior.row_index);
100            wal_records.push(ColumnarWalRecord::DeleteRows {
101                collection: self.collection.clone(),
102                segment_id: prior.segment_id,
103                row_indices: vec![prior.row_index],
104            });
105        }
106
107        let row_data = encode_row_for_wal(values)?;
108        wal_records.push(ColumnarWalRecord::InsertRow {
109            collection: self.collection.clone(),
110            row_data,
111        });
112
113        self.memtable.append_row(values)?;
114        let location = RowLocation {
115            segment_id: self.memtable_segment_id,
116            row_index: self.memtable_row_counter,
117        };
118        self.pk_index.upsert(pk_bytes, location);
119        self.memtable_row_counter += 1;
120
121        Ok(MutationResult { wal_records })
122    }
123
124    /// `INSERT ... ON CONFLICT DO NOTHING` semantics: append only if the
125    /// PK is absent; silently skip on duplicate.
126    ///
127    /// Returns `Ok(MutationResult { wal_records })` with an empty vector
128    /// when the row was skipped, so callers that batch WAL appends can
129    /// detect no-ops by checking `wal_records.is_empty()`.
130    pub fn insert_if_absent(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
131        let pk_bytes = self.extract_pk_bytes(values)?;
132        if self.pk_index.contains(&pk_bytes) {
133            return Ok(MutationResult {
134                wal_records: Vec::new(),
135            });
136        }
137
138        let row_data = encode_row_for_wal(values)?;
139        let wal = ColumnarWalRecord::InsertRow {
140            collection: self.collection.clone(),
141            row_data,
142        };
143        self.memtable.append_row(values)?;
144        let location = RowLocation {
145            segment_id: self.memtable_segment_id,
146            row_index: self.memtable_row_counter,
147        };
148        self.pk_index.upsert(pk_bytes, location);
149        self.memtable_row_counter += 1;
150
151        Ok(MutationResult {
152            wal_records: vec![wal],
153        })
154    }
155
156    /// Look up the current row for a PK in the memtable, if present.
157    ///
158    /// Returns `None` if the PK is not in the index, or if the PK points
159    /// to a flushed segment (callers needing cross-segment lookup must
160    /// go through a segment reader separately). This is the fast path
161    /// used by `ON CONFLICT DO UPDATE` to read the would-be-merged row
162    /// when the duplicate hits the memtable — the common case under
163    /// back-to-back inserts.
164    pub fn lookup_memtable_row_by_pk(&self, pk_bytes: &[u8]) -> Option<Vec<Value>> {
165        let loc = self.pk_index.get(pk_bytes).copied()?;
166        if loc.segment_id != self.memtable_segment_id {
167            return None;
168        }
169        self.memtable.get_row(loc.row_index as usize)
170    }
171
172    /// Encode a PK value as index bytes. Exposed for callers that need
173    /// to probe the PK index (e.g. `ON CONFLICT DO UPDATE` routing).
174    pub fn encode_pk_from_row(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
175        self.extract_pk_bytes(values)
176    }
177
178    /// Delete a row by PK value. Returns WAL record to persist.
179    ///
180    /// Looks up PK in the index to find the segment + row, then marks
181    /// the row in the segment's delete bitmap.
182    pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
183        let pk_bytes = encode_pk(pk_value);
184
185        let location = self
186            .pk_index
187            .get(&pk_bytes)
188            .copied()
189            .ok_or(ColumnarError::PrimaryKeyNotFound)?;
190
191        // Generate WAL record BEFORE applying.
192        let wal = ColumnarWalRecord::DeleteRows {
193            collection: self.collection.clone(),
194            segment_id: location.segment_id,
195            row_indices: vec![location.row_index],
196        };
197
198        // Mark in delete bitmap.
199        let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
200        bitmap.mark_deleted(location.row_index);
201
202        // Remove from PK index.
203        self.pk_index.remove(&pk_bytes);
204
205        Ok(MutationResult {
206            wal_records: vec![wal],
207        })
208    }
209
210    /// Update a row by PK: DELETE old + INSERT new.
211    ///
212    /// `updates` maps column names to new values. Columns not in the map
213    /// retain their existing values from the old row.
214    ///
215    /// Returns WAL records for both the delete and the insert.
216    ///
217    /// NOTE: The caller must provide the full old row values for the re-insert.
218    /// This method takes the complete new row (already merged with old values).
219    pub fn update(
220        &mut self,
221        old_pk: &Value,
222        new_values: &[Value],
223    ) -> Result<MutationResult, ColumnarError> {
224        // Delete the old row.
225        let delete_result = self.delete(old_pk)?;
226
227        // Insert the new row.
228        let insert_result = self.insert(new_values)?;
229
230        // Combine WAL records.
231        let mut wal_records = delete_result.wal_records;
232        wal_records.extend(insert_result.wal_records);
233
234        Ok(MutationResult { wal_records })
235    }
236
237    /// Notify the engine that the memtable was flushed to a new segment.
238    ///
239    /// Updates the PK index to remap memtable entries to the new segment.
240    /// Returns the WAL record for the flush event.
241    pub fn on_memtable_flushed(&mut self, new_segment_id: u32) -> MutationResult {
242        let row_count = self.memtable_row_counter;
243
244        // Remap PK index entries from virtual memtable segment to real segment.
245        self.pk_index
246            .remap_segment(self.memtable_segment_id, |old_row| {
247                Some(RowLocation {
248                    segment_id: new_segment_id,
249                    row_index: old_row,
250                })
251            });
252
253        // Reset memtable tracking.
254        self.memtable_segment_id = self.next_segment_id;
255        self.next_segment_id += 1;
256        self.memtable_row_counter = 0;
257
258        let wal = ColumnarWalRecord::MemtableFlushed {
259            collection: self.collection.clone(),
260            segment_id: new_segment_id,
261            row_count: row_count as u64,
262        };
263
264        MutationResult {
265            wal_records: vec![wal],
266        }
267    }
268
269    /// Notify the engine that compaction completed.
270    ///
271    /// Remaps PK index entries and removes old delete bitmaps.
272    pub fn on_compaction_complete(
273        &mut self,
274        old_segment_ids: &[u32],
275        new_segment_id: u32,
276        row_mapping: &HashMap<(u32, u32), u32>,
277    ) -> MutationResult {
278        // Remap PK index for each old segment.
279        for &old_seg in old_segment_ids {
280            self.pk_index.remap_segment(old_seg, |old_row| {
281                row_mapping
282                    .get(&(old_seg, old_row))
283                    .map(|&new_row| RowLocation {
284                        segment_id: new_segment_id,
285                        row_index: new_row,
286                    })
287            });
288
289            // Remove old delete bitmap.
290            self.delete_bitmaps.remove(&old_seg);
291        }
292
293        let wal = ColumnarWalRecord::CompactionCommit {
294            collection: self.collection.clone(),
295            old_segment_ids: old_segment_ids.to_vec(),
296            new_segment_ids: vec![new_segment_id],
297        };
298
299        MutationResult {
300            wal_records: vec![wal],
301        }
302    }
303
304    // -- Accessors --
305
306    /// Access the memtable.
307    pub fn memtable(&self) -> &ColumnarMemtable {
308        &self.memtable
309    }
310
311    /// Mutable access to the memtable (for drain on flush).
312    pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
313        &mut self.memtable
314    }
315
316    /// Access the PK index.
317    pub fn pk_index(&self) -> &PkIndex {
318        &self.pk_index
319    }
320
321    /// Mutable access to the PK index (for cold-start rebuild).
322    pub fn pk_index_mut(&mut self) -> &mut PkIndex {
323        &mut self.pk_index
324    }
325
326    /// Access a segment's delete bitmap.
327    pub fn delete_bitmap(&self, segment_id: u32) -> Option<&DeleteBitmap> {
328        self.delete_bitmaps.get(&segment_id)
329    }
330
331    /// Access all delete bitmaps.
332    pub fn delete_bitmaps(&self) -> &HashMap<u32, DeleteBitmap> {
333        &self.delete_bitmaps
334    }
335
336    /// The collection name.
337    pub fn collection(&self) -> &str {
338        &self.collection
339    }
340
341    /// The schema.
342    pub fn schema(&self) -> &ColumnarSchema {
343        &self.schema
344    }
345
346    /// Whether the memtable should be flushed.
347    pub fn should_flush(&self) -> bool {
348        self.memtable.should_flush()
349    }
350
351    /// Iterate non-deleted rows in the memtable as `Vec<Value>`.
352    ///
353    /// Skips rows marked as deleted in the memtable's virtual segment
354    /// delete bitmap. For rows in flushed segments, use `SegmentReader`.
355    pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
356        let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
357        self.memtable
358            .iter_rows()
359            .enumerate()
360            .filter_map(move |(row_idx, row)| {
361                if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
362                    None
363                } else {
364                    Some(row)
365                }
366            })
367    }
368
369    /// Get a single row from the memtable by index (None if deleted).
370    pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
371        if self
372            .delete_bitmaps
373            .get(&self.memtable_segment_id)
374            .is_some_and(|bm| bm.is_deleted(row_idx as u32))
375        {
376            return None;
377        }
378        self.memtable.get_row(row_idx)
379    }
380
381    /// The segment ID that will be assigned to the next flushed segment.
382    ///
383    /// Use this to obtain the ID to pass to `on_memtable_flushed`.
384    pub fn next_segment_id(&self) -> u32 {
385        self.next_segment_id
386    }
387
388    /// Whether a segment should be compacted based on its delete ratio.
389    pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
390        self.delete_bitmaps
391            .get(&segment_id)
392            .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
393    }
394
395    // -- Internal helpers --
396
397    /// Extract PK bytes from a row of values.
398    fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
399        if values.len() != self.schema.columns.len() {
400            return Err(ColumnarError::SchemaMismatch {
401                expected: self.schema.columns.len(),
402                got: values.len(),
403            });
404        }
405
406        if self.pk_col_indices.len() == 1 {
407            Ok(encode_pk(&values[self.pk_col_indices[0]]))
408        } else {
409            let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
410            Ok(crate::pk_index::encode_composite_pk(&pk_values))
411        }
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
418    use nodedb_types::value::Value;
419
420    use super::*;
421
422    fn test_schema() -> ColumnarSchema {
423        ColumnarSchema::new(vec![
424            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
425            ColumnDef::required("name", ColumnType::String),
426            ColumnDef::nullable("score", ColumnType::Float64),
427        ])
428        .expect("valid")
429    }
430
431    #[test]
432    fn insert_and_pk_check() {
433        let mut engine = MutationEngine::new("test".into(), test_schema());
434
435        let result = engine
436            .insert(&[
437                Value::Integer(1),
438                Value::String("Alice".into()),
439                Value::Float(0.75),
440            ])
441            .expect("insert");
442
443        assert_eq!(result.wal_records.len(), 1);
444        assert!(matches!(
445            &result.wal_records[0],
446            ColumnarWalRecord::InsertRow { .. }
447        ));
448
449        assert_eq!(engine.pk_index().len(), 1);
450        assert_eq!(engine.memtable().row_count(), 1);
451    }
452
453    #[test]
454    fn delete_by_pk() {
455        let mut engine = MutationEngine::new("test".into(), test_schema());
456
457        engine
458            .insert(&[
459                Value::Integer(1),
460                Value::String("Alice".into()),
461                Value::Null,
462            ])
463            .expect("insert");
464
465        let result = engine.delete(&Value::Integer(1)).expect("delete");
466        assert_eq!(result.wal_records.len(), 1);
467        assert!(matches!(
468            &result.wal_records[0],
469            ColumnarWalRecord::DeleteRows { .. }
470        ));
471
472        // PK should be removed from index.
473        assert!(engine.pk_index().is_empty());
474    }
475
476    #[test]
477    fn delete_nonexistent_pk() {
478        let mut engine = MutationEngine::new("test".into(), test_schema());
479
480        let err = engine.delete(&Value::Integer(999));
481        assert!(matches!(err, Err(ColumnarError::PrimaryKeyNotFound)));
482    }
483
484    #[test]
485    fn update_row() {
486        let mut engine = MutationEngine::new("test".into(), test_schema());
487
488        engine
489            .insert(&[
490                Value::Integer(1),
491                Value::String("Alice".into()),
492                Value::Float(0.5),
493            ])
494            .expect("insert");
495
496        // Update: change name and score, keep same PK.
497        let result = engine
498            .update(
499                &Value::Integer(1),
500                &[
501                    Value::Integer(1),
502                    Value::String("Alice Updated".into()),
503                    Value::Float(0.75),
504                ],
505            )
506            .expect("update");
507
508        // Should produce 2 WAL records: delete + insert.
509        assert_eq!(result.wal_records.len(), 2);
510        assert!(matches!(
511            &result.wal_records[0],
512            ColumnarWalRecord::DeleteRows { .. }
513        ));
514        assert!(matches!(
515            &result.wal_records[1],
516            ColumnarWalRecord::InsertRow { .. }
517        ));
518
519        // PK index should still have 1 entry.
520        assert_eq!(engine.pk_index().len(), 1);
521        // Memtable should have 2 rows (original + updated).
522        assert_eq!(engine.memtable().row_count(), 2);
523    }
524
525    #[test]
526    fn memtable_flush_remaps_pk() {
527        let mut engine = MutationEngine::new("test".into(), test_schema());
528
529        for i in 0..5 {
530            engine
531                .insert(&[
532                    Value::Integer(i),
533                    Value::String(format!("u{i}")),
534                    Value::Null,
535                ])
536                .expect("insert");
537        }
538
539        // Simulate flush: memtable becomes segment 1.
540        let result = engine.on_memtable_flushed(1);
541        assert_eq!(result.wal_records.len(), 1);
542        assert!(matches!(
543            &result.wal_records[0],
544            ColumnarWalRecord::MemtableFlushed {
545                segment_id: 1,
546                row_count: 5,
547                ..
548            }
549        ));
550
551        // PK index entries should now point to segment 1.
552        let pk = encode_pk(&Value::Integer(3));
553        let loc = engine.pk_index().get(&pk).expect("pk exists");
554        assert_eq!(loc.segment_id, 1);
555        assert_eq!(loc.row_index, 3);
556    }
557
558    #[test]
559    fn multiple_inserts_and_deletes() {
560        let mut engine = MutationEngine::new("test".into(), test_schema());
561
562        for i in 0..10 {
563            engine
564                .insert(&[
565                    Value::Integer(i),
566                    Value::String(format!("u{i}")),
567                    Value::Null,
568                ])
569                .expect("insert");
570        }
571
572        // Delete odd-numbered rows.
573        for i in (1..10).step_by(2) {
574            engine.delete(&Value::Integer(i)).expect("delete");
575        }
576
577        assert_eq!(engine.pk_index().len(), 5); // 0, 2, 4, 6, 8.
578    }
579
580    #[test]
581    fn should_compact_threshold() {
582        let mut engine = MutationEngine::new("test".into(), test_schema());
583
584        // Insert and flush to create a real segment.
585        for i in 0..10 {
586            engine
587                .insert(&[
588                    Value::Integer(i),
589                    Value::String(format!("u{i}")),
590                    Value::Null,
591                ])
592                .expect("insert");
593        }
594        engine.on_memtable_flushed(1);
595
596        // Delete 3 out of 10 rows = 30% > 20% threshold.
597        for i in 0..3 {
598            engine.delete(&Value::Integer(i)).expect("delete");
599        }
600
601        assert!(engine.should_compact(1, 10));
602    }
603}