nodedb_columnar/mutation/write.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Write-path mutations: insert, insert_if_absent, delete, update.
4
5use nodedb_types::surrogate::Surrogate;
6use nodedb_types::value::Value;
7
8use crate::error::ColumnarError;
9use crate::pk_index::{RowLocation, encode_pk};
10use crate::wal_record::{ColumnarWalRecord, encode_row_for_wal};
11
12use super::engine::{MutationEngine, MutationResult};
13
14impl MutationEngine {
15 /// Insert a row with upsert-on-duplicate semantics. Returns WAL
16 /// records to persist.
17 ///
18 /// Validates schema. If the PK already exists, the prior row is
19 /// tombstoned via the segment's delete bitmap (a single positional
20 /// delete) before the new row is appended to the memtable. The PK
21 /// index is rebound to the new row location. This matches the
22 /// ClickHouse / Iceberg "sparse PK + positional delete" model and
23 /// keeps `SELECT WHERE pk = X` linearizable on one row without a
24 /// read-time merge pass.
25 ///
26 /// Callers that want strict INSERT (error on duplicate) should check
27 /// `pk_index().contains()` themselves before calling; callers that
28 /// want `ON CONFLICT DO NOTHING` semantics should use
29 /// [`Self::insert_if_absent`].
30 pub fn insert(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
31 let pk_bytes = self.extract_pk_bytes(values)?;
32 // no-governor: fixed-tiny 2-element WAL record vec per insert
33 let mut wal_records = Vec::with_capacity(2);
34
35 // Bitemporal collections preserve every version of a PK: each
36 // write appends a new row with a distinct `_ts_system` stamp and
37 // the prior row stays visible to `AS OF` queries. Skipping the
38 // upsert-tombstone here keeps compaction lossless without
39 // needing a separate "version-aware" delete bitmap. The PK
40 // index is still rebound below so current-state reads see the
41 // latest version.
42 let bitemporal = self.schema.is_bitemporal();
43
44 // If a prior row exists for this PK, tombstone it in place so
45 // subsequent scans skip the stale row. The PK index is rebound
46 // below to the freshly-appended row.
47 if !bitemporal && let Some(prior) = self.pk_index.get(&pk_bytes).copied() {
48 let bitmap = self.delete_bitmaps.entry(prior.segment_id).or_default();
49 bitmap.mark_deleted(prior.row_index);
50 wal_records.push(ColumnarWalRecord::DeleteRows {
51 collection: self.collection.clone(),
52 segment_id: prior.segment_id,
53 row_indices: vec![prior.row_index],
54 });
55 }
56
57 let row_data = encode_row_for_wal(values)?;
58 wal_records.push(ColumnarWalRecord::InsertRow {
59 collection: self.collection.clone(),
60 row_data,
61 });
62
63 self.memtable.append_row(values)?;
64 let location = RowLocation {
65 segment_id: self.memtable_segment_id,
66 row_index: self.memtable_row_counter,
67 };
68 self.pk_index.upsert(pk_bytes, location);
69 self.memtable_surrogates.push(None);
70 self.memtable_row_counter += 1;
71
72 Ok(MutationResult { wal_records })
73 }
74
75 /// Insert with a stable cross-engine surrogate identity.
76 ///
77 /// Identical to [`Self::insert`] but also records `surrogate` in the
78 /// per-row side-table so scan prefilters can perform bitmap membership
79 /// checks without a separate lookup pass.
80 pub fn insert_with_surrogate(
81 &mut self,
82 values: &[Value],
83 surrogate: Surrogate,
84 ) -> Result<MutationResult, ColumnarError> {
85 let pk_bytes = self.extract_pk_bytes(values)?;
86 // no-governor: fixed-tiny 2-element WAL record vec per insert
87 let mut wal_records = Vec::with_capacity(2);
88
89 let bitemporal = self.schema.is_bitemporal();
90 if !bitemporal && let Some(prior) = self.pk_index.get(&pk_bytes).copied() {
91 let bitmap = self.delete_bitmaps.entry(prior.segment_id).or_default();
92 bitmap.mark_deleted(prior.row_index);
93 wal_records.push(ColumnarWalRecord::DeleteRows {
94 collection: self.collection.clone(),
95 segment_id: prior.segment_id,
96 row_indices: vec![prior.row_index],
97 });
98 }
99
100 let row_data = encode_row_for_wal(values)?;
101 wal_records.push(ColumnarWalRecord::InsertRow {
102 collection: self.collection.clone(),
103 row_data,
104 });
105
106 self.memtable.append_row(values)?;
107 let location = RowLocation {
108 segment_id: self.memtable_segment_id,
109 row_index: self.memtable_row_counter,
110 };
111 self.pk_index.upsert(pk_bytes, location);
112 self.memtable_surrogates.push(Some(surrogate));
113 self.memtable_row_counter += 1;
114
115 Ok(MutationResult { wal_records })
116 }
117
118 /// `INSERT ... ON CONFLICT DO NOTHING` semantics: append only if the
119 /// PK is absent; silently skip on duplicate.
120 ///
121 /// Returns `Ok(MutationResult { wal_records })` with an empty vector
122 /// when the row was skipped, so callers that batch WAL appends can
123 /// detect no-ops by checking `wal_records.is_empty()`.
124 pub fn insert_if_absent(&mut self, values: &[Value]) -> Result<MutationResult, ColumnarError> {
125 let pk_bytes = self.extract_pk_bytes(values)?;
126 if self.pk_index.contains(&pk_bytes) {
127 return Ok(MutationResult {
128 wal_records: Vec::new(),
129 });
130 }
131
132 let row_data = encode_row_for_wal(values)?;
133 let wal = ColumnarWalRecord::InsertRow {
134 collection: self.collection.clone(),
135 row_data,
136 };
137 self.memtable.append_row(values)?;
138 let location = RowLocation {
139 segment_id: self.memtable_segment_id,
140 row_index: self.memtable_row_counter,
141 };
142 self.pk_index.upsert(pk_bytes, location);
143 self.memtable_surrogates.push(None);
144 self.memtable_row_counter += 1;
145
146 Ok(MutationResult {
147 wal_records: vec![wal],
148 })
149 }
150
151 /// Look up the current row for a PK in the memtable, if present.
152 ///
153 /// Returns `None` if the PK is not in the index, or if the PK points
154 /// to a flushed segment (callers needing cross-segment lookup must
155 /// go through a segment reader separately). This is the fast path
156 /// used by `ON CONFLICT DO UPDATE` to read the would-be-merged row
157 /// when the duplicate hits the memtable — the common case under
158 /// back-to-back inserts.
159 pub fn lookup_memtable_row_by_pk(&self, pk_bytes: &[u8]) -> Option<Vec<Value>> {
160 let loc = self.pk_index.get(pk_bytes).copied()?;
161 if loc.segment_id != self.memtable_segment_id {
162 return None;
163 }
164 self.memtable.get_row(loc.row_index as usize)
165 }
166
167 /// Delete a row by PK value. Returns WAL record to persist.
168 ///
169 /// Looks up PK in the index to find the segment + row, then marks
170 /// the row in the segment's delete bitmap.
171 pub fn delete(&mut self, pk_value: &Value) -> Result<MutationResult, ColumnarError> {
172 let pk_bytes = encode_pk(pk_value);
173
174 let location = self
175 .pk_index
176 .get(&pk_bytes)
177 .copied()
178 .ok_or(ColumnarError::PrimaryKeyNotFound)?;
179
180 // Generate WAL record BEFORE applying.
181 let wal = ColumnarWalRecord::DeleteRows {
182 collection: self.collection.clone(),
183 segment_id: location.segment_id,
184 row_indices: vec![location.row_index],
185 };
186
187 // Mark in delete bitmap.
188 let bitmap = self.delete_bitmaps.entry(location.segment_id).or_default();
189 bitmap.mark_deleted(location.row_index);
190
191 // Remove from PK index.
192 self.pk_index.remove(&pk_bytes);
193
194 Ok(MutationResult {
195 wal_records: vec![wal],
196 })
197 }
198
199 /// Update a row by PK: DELETE old + INSERT new.
200 ///
201 /// `updates` maps column names to new values. Columns not in the map
202 /// retain their existing values from the old row.
203 ///
204 /// Returns WAL records for both the delete and the insert.
205 ///
206 /// NOTE: The caller must provide the full old row values for the re-insert.
207 /// This method takes the complete new row (already merged with old values).
208 pub fn update(
209 &mut self,
210 old_pk: &Value,
211 new_values: &[Value],
212 ) -> Result<MutationResult, ColumnarError> {
213 // Delete the old row.
214 let delete_result = self.delete(old_pk)?;
215
216 // Insert the new row.
217 let insert_result = self.insert(new_values)?;
218
219 // Combine WAL records.
220 let mut wal_records = delete_result.wal_records;
221 wal_records.extend(insert_result.wal_records);
222
223 Ok(MutationResult { wal_records })
224 }
225}