nodedb_columnar/mutation/
engine.rs1use std::collections::HashMap;
6
7use nodedb_types::columnar::ColumnarSchema;
8use nodedb_types::surrogate::Surrogate;
9use nodedb_types::value::Value;
10
11use crate::delete_bitmap::DeleteBitmap;
12use crate::error::ColumnarError;
13use crate::memtable::ColumnarMemtable;
14use crate::pk_index::{PkIndex, encode_pk};
15use crate::wal_record::ColumnarWalRecord;
16
17pub struct MutationEngine {
22 pub(super) collection: String,
23 pub(super) schema: ColumnarSchema,
24 pub(super) memtable: ColumnarMemtable,
25 pub(super) pk_index: PkIndex,
26 pub(super) delete_bitmaps: HashMap<u64, DeleteBitmap>,
28 pub(super) pk_col_indices: Vec<usize>,
30 pub(super) next_segment_id: u64,
32 pub(super) memtable_segment_id: u64,
35 pub(super) memtable_row_counter: u32,
37 pub(super) memtable_surrogates: Vec<Option<Surrogate>>,
43}
44
45#[derive(Debug)]
47pub struct MutationResult {
48 pub wal_records: Vec<ColumnarWalRecord>,
50}
51
52impl MutationEngine {
53 pub fn new(collection: String, schema: ColumnarSchema) -> Self {
55 let pk_col_indices: Vec<usize> = schema
56 .columns
57 .iter()
58 .enumerate()
59 .filter(|(_, c)| c.primary_key)
60 .map(|(i, _)| i)
61 .collect();
62
63 let memtable = ColumnarMemtable::new(&schema);
64 let memtable_segment_id = 0;
66
67 Self {
68 collection,
69 schema,
70 memtable,
71 pk_index: PkIndex::new(),
72 delete_bitmaps: HashMap::new(),
73 pk_col_indices,
74 next_segment_id: 1,
75 memtable_segment_id,
76 memtable_row_counter: 0,
77 memtable_surrogates: Vec::new(),
78 }
79 }
80
81 pub fn memtable(&self) -> &ColumnarMemtable {
85 &self.memtable
86 }
87
88 pub fn memtable_mut(&mut self) -> &mut ColumnarMemtable {
90 &mut self.memtable
91 }
92
93 pub fn pk_index(&self) -> &PkIndex {
95 &self.pk_index
96 }
97
98 pub fn pk_index_mut(&mut self) -> &mut PkIndex {
100 &mut self.pk_index
101 }
102
103 pub fn delete_bitmap(&self, segment_id: u64) -> Option<&DeleteBitmap> {
105 self.delete_bitmaps.get(&segment_id)
106 }
107
108 pub fn delete_bitmap_mut(&mut self, segment_id: u64) -> &mut DeleteBitmap {
113 self.delete_bitmaps.entry(segment_id).or_default()
114 }
115
116 pub fn memtable_segment_id(&self) -> u64 {
118 self.memtable_segment_id
119 }
120
121 pub fn pk_col_indices(&self) -> &[usize] {
123 &self.pk_col_indices
124 }
125
126 pub fn delete_bitmaps(&self) -> &HashMap<u64, DeleteBitmap> {
128 &self.delete_bitmaps
129 }
130
131 pub fn collection(&self) -> &str {
133 &self.collection
134 }
135
136 pub fn schema(&self) -> &ColumnarSchema {
138 &self.schema
139 }
140
141 pub fn should_flush(&self) -> bool {
143 self.memtable.should_flush()
144 }
145
146 pub fn memtable_surrogates(&self) -> &[Option<Surrogate>] {
151 &self.memtable_surrogates
152 }
153
154 pub fn scan_memtable_rows(&self) -> impl Iterator<Item = Vec<Value>> + '_ {
159 let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
160 self.memtable
161 .iter_rows()
162 .enumerate()
163 .filter_map(move |(row_idx, row)| {
164 if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
165 None
166 } else {
167 Some(row)
168 }
169 })
170 }
171
172 pub fn scan_memtable_rows_with_surrogates(
178 &self,
179 ) -> impl Iterator<Item = (Option<Surrogate>, Vec<Value>)> + '_ {
180 let deletes = self.delete_bitmaps.get(&self.memtable_segment_id);
181 let surrogates = &self.memtable_surrogates;
182 self.memtable
183 .iter_rows()
184 .enumerate()
185 .filter_map(move |(row_idx, row)| {
186 if deletes.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
187 return None;
188 }
189 let surrogate = surrogates.get(row_idx).copied().flatten();
190 Some((surrogate, row))
191 })
192 }
193
194 pub fn get_memtable_row(&self, row_idx: usize) -> Option<Vec<Value>> {
196 if self
197 .delete_bitmaps
198 .get(&self.memtable_segment_id)
199 .is_some_and(|bm| bm.is_deleted(row_idx as u32))
200 {
201 return None;
202 }
203 self.memtable.get_row(row_idx)
204 }
205
206 pub fn rollback_memtable_inserts(
217 &mut self,
218 row_count_before: usize,
219 inserted_pks: &[Vec<u8>],
220 displaced: &[(Vec<u8>, crate::pk_index::RowLocation)],
221 ) {
222 for pk in inserted_pks {
224 self.pk_index.remove(pk);
225 }
226 for (pk, prior_location) in displaced {
228 self.pk_index.upsert(pk.clone(), *prior_location);
229 if let Some(bm) = self.delete_bitmaps.get_mut(&prior_location.segment_id) {
231 bm.unmark_deleted(prior_location.row_index);
232 }
233 }
234 self.memtable.truncate_to(row_count_before);
236 self.memtable_surrogates.truncate(row_count_before);
237 self.memtable_row_counter = row_count_before as u32;
238 }
239
240 pub fn next_segment_id(&self) -> u64 {
244 self.next_segment_id
245 }
246
247 pub fn should_compact(&self, segment_id: u64, total_rows: u64) -> bool {
249 self.delete_bitmaps
250 .get(&segment_id)
251 .is_some_and(|bm| bm.should_compact(total_rows, 0.2))
252 }
253
254 pub fn encode_pk_from_row(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
257 self.extract_pk_bytes(values)
258 }
259
260 pub(super) fn extract_pk_bytes(&self, values: &[Value]) -> Result<Vec<u8>, ColumnarError> {
264 if values.len() != self.schema.columns.len() {
265 return Err(ColumnarError::SchemaMismatch {
266 expected: self.schema.columns.len(),
267 got: values.len(),
268 });
269 }
270
271 if self.pk_col_indices.len() == 1 {
272 Ok(encode_pk(&values[self.pk_col_indices[0]]))
273 } else {
274 let pk_values: Vec<&Value> = self.pk_col_indices.iter().map(|&i| &values[i]).collect();
275 Ok(crate::pk_index::encode_composite_pk(&pk_values))
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
283
284 use super::*;
285
286 fn minimal_schema() -> ColumnarSchema {
287 ColumnarSchema {
288 columns: vec![ColumnDef::required("id", ColumnType::Int64).with_primary_key()],
289 version: 1,
290 }
291 }
292
293 #[test]
294 fn segment_id_allocator_returns_err_at_u64_max() {
295 let mut engine = MutationEngine::new("test".to_string(), minimal_schema());
296 engine.next_segment_id = u64::MAX;
297 let result = engine.on_memtable_flushed(u64::MAX - 1);
298 assert!(
299 matches!(result, Err(ColumnarError::SegmentIdExhausted)),
300 "expected SegmentIdExhausted, got: {result:?}"
301 );
302 }
303}