Skip to main content

nodedb_columnar/mutation/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! MutationEngine struct, constructor, accessors, and shared helpers.
4
5use std::collections::HashMap;
6
7use nodedb_types::columnar::ColumnarSchema;
8use nodedb_types::surrogate::Surrogate;
9use nodedb_types::value::Value;
10
11use crate::delete_bitmap::DeleteBitmap;
12use crate::error::ColumnarError;
13use crate::memtable::ColumnarMemtable;
14use crate::pk_index::{PkIndex, encode_pk};
15use crate::wal_record::ColumnarWalRecord;
16
17/// Coordinates all columnar mutations for a single collection.
18///
19/// Owns the memtable, PK index, and per-segment delete bitmaps.
20/// Produces WAL records for each mutation that the caller must persist.
21pub struct MutationEngine {
22    pub(super) collection: String,
23    pub(super) schema: ColumnarSchema,
24    pub(super) memtable: ColumnarMemtable,
25    pub(super) pk_index: PkIndex,
26    /// Per-segment delete bitmaps. Key = segment_id.
27    pub(super) delete_bitmaps: HashMap<u64, DeleteBitmap>,
28    /// PK column indices in the schema.
29    pub(super) pk_col_indices: Vec<usize>,
30    /// Counter for assigning segment IDs.
31    pub(super) next_segment_id: u64,
32    /// Current "memtable segment ID" — a virtual segment ID for rows
33    /// that are still in the memtable (not yet flushed).
34    pub(super) memtable_segment_id: u64,
35    /// Row counter within the current memtable (resets on flush).
36    pub(super) memtable_row_counter: u32,
37    /// Per-row surrogate identities, parallel to the memtable rows.
38    ///
39    /// Entry `i` is the surrogate assigned to memtable row `i` at insert
40    /// time, or `None` if no surrogate was supplied (test fixtures, legacy
41    /// inserts). The vec is drained alongside the memtable on flush.
42    pub(super) memtable_surrogates: Vec<Option<Surrogate>>,
43}
44
45/// Result of a mutation operation, including the WAL record to persist.
46#[derive(Debug)]
47pub struct MutationResult {
48    /// WAL record(s) to persist before the mutation is considered durable.
49    pub wal_records: Vec<ColumnarWalRecord>,
50}
51
52impl MutationEngine {
53    /// Create a new mutation engine for a collection.
54    pub fn new(collection: String, schema: ColumnarSchema) -> Self {
55        let pk_col_indices: Vec<usize> = schema
56            .columns
57            .iter()
58            .enumerate()
59            .filter(|(_, c)| c.primary_key)
60            .map(|(i, _)| i)
61            .collect();
62
63        let memtable = ColumnarMemtable::new(&schema);
64        // Reserve segment_id 0 for the first memtable. Real segments start at 1.
65        let memtable_segment_id = 0;
66
67        Self {
68            collection,
69            schema,
70            memtable,
71            pk_index: PkIndex::new(),
72            delete_bitmaps: HashMap::new(),
73            pk_col_indices,
74            next_segment_id: 1,
75            memtable_segment_id,
76            memtable_row_counter: 0,
77            memtable_surrogates: Vec::new(),
78        }
79    }
80
81    // -- Accessors --
82
83    /// Access the memtable.
84    pub fn memtable(&self) -> &ColumnarMemtable {
85        &self.memtable
86    }
87
88    /// Mutable access to the memtable (for drain on flush).
89    pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
90        &mut self.memtable
91    }
92
93    /// Access the PK index.
94    pub fn pk_index(&self) -> &PkIndex {
95        &self.pk_index
96    }
97
98    /// Mutable access to the PK index (for cold-start rebuild).
99    pub fn pk_index_mut(&mut self) -> &mut PkIndex {
100        &mut self.pk_index
101    }
102
103    /// Access a segment's delete bitmap.
104    pub fn delete_bitmap(&self, segment_id: u64) -> Option<&DeleteBitmap> {
105        self.delete_bitmaps.get(&segment_id)
106    }
107
108    /// Mutable access to a segment's delete bitmap. Creates an empty one
109    /// on first access so callers can `mark_deleted_batch` unconditionally.
110    /// Used by temporal-purge paths that tombstone superseded row positions
111    /// without going through the single-row `insert` / `delete` paths.
112    pub fn delete_bitmap_mut(&mut self, segment_id: u64) -> &mut DeleteBitmap {
113        self.delete_bitmaps.entry(segment_id).or_default()
114    }
115
116    /// The virtual segment id used for rows still in the memtable.
117    pub fn memtable_segment_id(&self) -> u64 {
118        self.memtable_segment_id
119    }
120
121    /// The schema's primary-key column indices, in schema order.
122    pub fn pk_col_indices(&self) -> &[usize] {
123        &self.pk_col_indices
124    }
125
126    /// Access all delete bitmaps.
127    pub fn delete_bitmaps(&self) -> &HashMap<u64, DeleteBitmap> {
128        &self.delete_bitmaps
129    }
130
131    /// The collection name.
132    pub fn collection(&self) -> &str {
133        &self.collection
134    }
135
136    /// The schema.
137    pub fn schema(&self) -> &ColumnarSchema {
138        &self.schema
139    }
140
141    /// Whether the memtable should be flushed.
142    pub fn should_flush(&self) -> bool {
143        self.memtable.should_flush()
144    }
145
146    /// Access the per-row surrogate table for the memtable.
147    ///
148    /// Index matches memtable row order; `None` entries indicate rows
149    /// inserted without a surrogate (test fixtures, legacy paths).
150    pub fn memtable_surrogates(&self) -> &[Option<Surrogate>] {
151        &self.memtable_surrogates
152    }
153
154    /// Iterate non-deleted rows in the memtable as `Vec<Value>`.
155    ///
156    /// Skips rows marked as deleted in the memtable's virtual segment
157    /// delete bitmap. For rows in flushed segments, use `SegmentReader`.
158    pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
159        let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
160        self.memtable
161            .iter_rows()
162            .enumerate()
163            .filter_map(move |(row_idx, row)| {
164                if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
165                    None
166                } else {
167                    Some(row)
168                }
169            })
170    }
171
172    /// Iterate non-deleted rows paired with their surrogate identity.
173    ///
174    /// Yields `(Option<Surrogate>, Vec<Value>)`. The surrogate is `None`
175    /// for rows inserted without one (test fixtures, legacy paths). Deleted
176    /// rows are filtered out exactly as in [`Self::scan_memtable_rows`].
177    pub fn scan_memtable_rows_with_surrogates(
178        &self,
179    ) -> impl Iterator<Item = (Option<Surrogate>, Vec<Value>)> + '_ {
180        let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
181        let surrogates = &self.memtable_surrogates;
182        self.memtable
183            .iter_rows()
184            .enumerate()
185            .filter_map(move |(row_idx, row)| {
186                if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
187                    return None;
188                }
189                let surrogate = surrogates.get(row_idx).copied().flatten();
190                Some((surrogate, row))
191            })
192    }
193
194    /// Get a single row from the memtable by index (None if deleted).
195    pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
196        if self
197            .delete_bitmaps
198            .get(&self.memtable_segment_id)
199            .is_some_and(|bm| bm.is_deleted(row_idx as u32))
200        {
201            return None;
202        }
203        self.memtable.get_row(row_idx)
204    }
205
206    /// Roll back in-memory inserts to `row_count_before`.
207    ///
208    /// Undoes the effect of one or more inserts that appended rows starting
209    /// at `row_count_before`. For each inserted row:
210    /// - The corresponding PK entry is removed from the PK index.
211    /// - If the insert displaced a prior row (upsert tombstone), that prior
212    ///   row's PK index entry is restored and its tombstone bit cleared.
213    ///
214    /// The memtable is then truncated to `row_count_before`. Used exclusively
215    /// by the transaction undo log; never called on the normal write path.
216    pub fn rollback_memtable_inserts(
217        &mut self,
218        row_count_before: usize,
219        inserted_pks: &[Vec<u8>],
220        displaced: &[(Vec<u8>, crate::pk_index::RowLocation)],
221    ) {
222        // 1. Remove newly inserted PK entries.
223        for pk in inserted_pks {
224            self.pk_index.remove(pk);
225        }
226        // 2. Restore displaced prior-row PK entries and clear tombstones.
227        for (pk, prior_location) in displaced {
228            self.pk_index.upsert(pk.clone(), *prior_location);
229            // Clear the tombstone bit that the insert set on the prior row.
230            if let Some(bm) = self.delete_bitmaps.get_mut(&prior_location.segment_id) {
231                bm.unmark_deleted(prior_location.row_index);
232            }
233        }
234        // 3. Truncate memtable and surrogate list.
235        self.memtable.truncate_to(row_count_before);
236        self.memtable_surrogates.truncate(row_count_before);
237        self.memtable_row_counter = row_count_before as u32;
238    }
239
240    /// The segment ID that will be assigned to the next flushed segment.
241    ///
242    /// Use this to obtain the ID to pass to `on_memtable_flushed`.
243    pub fn next_segment_id(&self) -> u64 {
244        self.next_segment_id
245    }
246
247    /// Whether a segment should be compacted based on its delete ratio.
248    pub fn should_compact(&self, segment_id: u64, total_rows: u64) -> bool {
249        self.delete_bitmaps
250            .get(&segment_id)
251            .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
252    }
253
254    /// Encode a PK value as index bytes. Exposed for callers that need
255    /// to probe the PK index (e.g. `ON CONFLICT DO UPDATE` routing).
256    pub fn encode_pk_from_row(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
257        self.extract_pk_bytes(values)
258    }
259
260    // -- Internal helpers --
261
262    /// Extract PK bytes from a row of values.
263    pub(super) fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
264        if values.len() != self.schema.columns.len() {
265            return Err(ColumnarError::SchemaMismatch {
266                expected: self.schema.columns.len(),
267                got: values.len(),
268            });
269        }
270
271        if self.pk_col_indices.len() == 1 {
272            Ok(encode_pk(&values[self.pk_col_indices[0]]))
273        } else {
274            let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
275            Ok(crate::pk_index::encode_composite_pk(&pk_values))
276        }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
283
284    use super::*;
285
286    fn minimal_schema() -> ColumnarSchema {
287        ColumnarSchema {
288            columns: vec![ColumnDef::required("id", ColumnType::Int64).with_primary_key()],
289            version: 1,
290        }
291    }
292
293    #[test]
294    fn segment_id_allocator_returns_err_at_u64_max() {
295        let mut engine = MutationEngine::new("test".to_string(), minimal_schema());
296        engine.next_segment_id = u64::MAX;
297        let result = engine.on_memtable_flushed(u64::MAX - 1);
298        assert!(
299            matches!(result, Err(ColumnarError::SegmentIdExhausted)),
300            "expected SegmentIdExhausted, got: {result:?}"
301        );
302    }
303}