Skip to main content

nodedb_columnar/mutation/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Write-path mutations: insert, insert_if_absent, delete, update.
4
5use nodedb_types::surrogate::Surrogate;
6use nodedb_types::value::Value;
7
8use crate::error::ColumnarError;
9use crate::pk_index::{RowLocation, encode_pk};
10use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
11
12use super::engine::{MutationEngine, MutationResult};
13
14impl MutationEngine {
15    /// Insert a row with upsert-on-duplicate semantics. Returns WAL
16    /// records to persist.
17    ///
18    /// Validates schema. If the PK already exists, the prior row is
19    /// tombstoned via the segment's delete bitmap (a single positional
20    /// delete) before the new row is appended to the memtable. The PK
21    /// index is rebound to the new row location. This matches the
22    /// ClickHouse / Iceberg "sparse PK + positional delete" model and
23    /// keeps `SELECT WHERE pk = X` linearizable on one row without a
24    /// read-time merge pass.
25    ///
26    /// Callers that want strict INSERT (error on duplicate) should check
27    /// `pk_index().contains()` themselves before calling; callers that
28    /// want `ON CONFLICT DO NOTHING` semantics should use
29    /// [`Self::insert_if_absent`].
30    pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
31        let pk_bytes = self.extract_pk_bytes(values)?;
32        // no-governor: fixed-tiny 2-element WAL record vec per insert
33        let mut wal_records = Vec::with_capacity(2);
34
35        // Bitemporal collections preserve every version of a PK: each
36        // write appends a new row with a distinct `_ts_system` stamp and
37        // the prior row stays visible to `AS OF` queries. Skipping the
38        // upsert-tombstone here keeps compaction lossless without
39        // needing a separate "version-aware" delete bitmap. The PK
40        // index is still rebound below so current-state reads see the
41        // latest version.
42        let bitemporal = self.schema.is_bitemporal();
43
44        // If a prior row exists for this PK, tombstone it in place so
45        // subsequent scans skip the stale row. The PK index is rebound
46        // below to the freshly-appended row.
47        if !bitemporal && let Some(prior) = self.pk_index.get(&pk_bytes).copied() {
48            let bitmap = self.delete_bitmaps.entry(prior.segment_id).or_default();
49            bitmap.mark_deleted(prior.row_index);
50            wal_records.push(ColumnarWalRecord::DeleteRows {
51                collection: self.collection.clone(),
52                segment_id: prior.segment_id,
53                row_indices: vec![prior.row_index],
54            });
55        }
56
57        let row_data = encode_row_for_wal(values)?;
58        wal_records.push(ColumnarWalRecord::InsertRow {
59            collection: self.collection.clone(),
60            row_data,
61        });
62
63        self.memtable.append_row(values)?;
64        let location = RowLocation {
65            segment_id: self.memtable_segment_id,
66            row_index: self.memtable_row_counter,
67        };
68        self.pk_index.upsert(pk_bytes, location);
69        self.memtable_surrogates.push(None);
70        self.memtable_row_counter += 1;
71
72        Ok(MutationResult { wal_records })
73    }
74
75    /// Insert with a stable cross-engine surrogate identity.
76    ///
77    /// Identical to [`Self::insert`] but also records `surrogate` in the
78    /// per-row side-table so scan prefilters can perform bitmap membership
79    /// checks without a separate lookup pass.
80    pub fn insert_with_surrogate(
81        &mut self,
82        values: &[Value],
83        surrogate: Surrogate,
84    ) -> Result<MutationResult, ColumnarError> {
85        let pk_bytes = self.extract_pk_bytes(values)?;
86        // no-governor: fixed-tiny 2-element WAL record vec per insert
87        let mut wal_records = Vec::with_capacity(2);
88
89        let bitemporal = self.schema.is_bitemporal();
90        if !bitemporal && let Some(prior) = self.pk_index.get(&pk_bytes).copied() {
91            let bitmap = self.delete_bitmaps.entry(prior.segment_id).or_default();
92            bitmap.mark_deleted(prior.row_index);
93            wal_records.push(ColumnarWalRecord::DeleteRows {
94                collection: self.collection.clone(),
95                segment_id: prior.segment_id,
96                row_indices: vec![prior.row_index],
97            });
98        }
99
100        let row_data = encode_row_for_wal(values)?;
101        wal_records.push(ColumnarWalRecord::InsertRow {
102            collection: self.collection.clone(),
103            row_data,
104        });
105
106        self.memtable.append_row(values)?;
107        let location = RowLocation {
108            segment_id: self.memtable_segment_id,
109            row_index: self.memtable_row_counter,
110        };
111        self.pk_index.upsert(pk_bytes, location);
112        self.memtable_surrogates.push(Some(surrogate));
113        self.memtable_row_counter += 1;
114
115        Ok(MutationResult { wal_records })
116    }
117
118    /// `INSERT ... ON CONFLICT DO NOTHING` semantics: append only if the
119    /// PK is absent; silently skip on duplicate.
120    ///
121    /// Returns `Ok(MutationResult { wal_records })` with an empty vector
122    /// when the row was skipped, so callers that batch WAL appends can
123    /// detect no-ops by checking `wal_records.is_empty()`.
124    pub fn insert_if_absent(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
125        let pk_bytes = self.extract_pk_bytes(values)?;
126        if self.pk_index.contains(&pk_bytes) {
127            return Ok(MutationResult {
128                wal_records: Vec::new(),
129            });
130        }
131
132        let row_data = encode_row_for_wal(values)?;
133        let wal = ColumnarWalRecord::InsertRow {
134            collection: self.collection.clone(),
135            row_data,
136        };
137        self.memtable.append_row(values)?;
138        let location = RowLocation {
139            segment_id: self.memtable_segment_id,
140            row_index: self.memtable_row_counter,
141        };
142        self.pk_index.upsert(pk_bytes, location);
143        self.memtable_surrogates.push(None);
144        self.memtable_row_counter += 1;
145
146        Ok(MutationResult {
147            wal_records: vec![wal],
148        })
149    }
150
151    /// Look up the current row for a PK in the memtable, if present.
152    ///
153    /// Returns `None` if the PK is not in the index, or if the PK points
154    /// to a flushed segment (callers needing cross-segment lookup must
155    /// go through a segment reader separately). This is the fast path
156    /// used by `ON CONFLICT DO UPDATE` to read the would-be-merged row
157    /// when the duplicate hits the memtable — the common case under
158    /// back-to-back inserts.
159    pub fn lookup_memtable_row_by_pk(&self, pk_bytes: &[u8]) -> Option<Vec<Value>> {
160        let loc = self.pk_index.get(pk_bytes).copied()?;
161        if loc.segment_id != self.memtable_segment_id {
162            return None;
163        }
164        self.memtable.get_row(loc.row_index as usize)
165    }
166
167    /// Delete a row by PK value. Returns WAL record to persist.
168    ///
169    /// Looks up PK in the index to find the segment + row, then marks
170    /// the row in the segment's delete bitmap.
171    pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
172        let pk_bytes = encode_pk(pk_value);
173
174        let location = self
175            .pk_index
176            .get(&pk_bytes)
177            .copied()
178            .ok_or(ColumnarError::PrimaryKeyNotFound)?;
179
180        // Generate WAL record BEFORE applying.
181        let wal = ColumnarWalRecord::DeleteRows {
182            collection: self.collection.clone(),
183            segment_id: location.segment_id,
184            row_indices: vec![location.row_index],
185        };
186
187        // Mark in delete bitmap.
188        let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
189        bitmap.mark_deleted(location.row_index);
190
191        // Remove from PK index.
192        self.pk_index.remove(&pk_bytes);
193
194        Ok(MutationResult {
195            wal_records: vec![wal],
196        })
197    }
198
199    /// Update a row by PK: DELETE old + INSERT new.
200    ///
201    /// `updates` maps column names to new values. Columns not in the map
202    /// retain their existing values from the old row.
203    ///
204    /// Returns WAL records for both the delete and the insert.
205    ///
206    /// NOTE: The caller must provide the full old row values for the re-insert.
207    /// This method takes the complete new row (already merged with old values).
208    pub fn update(
209        &mut self,
210        old_pk: &Value,
211        new_values: &[Value],
212    ) -> Result<MutationResult, ColumnarError> {
213        // Delete the old row.
214        let delete_result = self.delete(old_pk)?;
215
216        // Insert the new row.
217        let insert_result = self.insert(new_values)?;
218
219        // Combine WAL records.
220        let mut wal_records = delete_result.wal_records;
221        wal_records.extend(insert_result.wal_records);
222
223        Ok(MutationResult { wal_records })
224    }
225}