Skip to main content

nodedb_columnar/memtable/
mod.rs

1//! Columnar memtable: in-memory row buffer with typed column vectors.
2//!
3//! Each column is stored as a typed vector (Vec<i64>, Vec<f64>, etc.) rather
4//! than Vec<Value> to avoid enum overhead and enable SIMD-friendly memory layout.
5//! The memtable accumulates INSERTs and flushes to a segment when the row count
6//! reaches the configured threshold.
7//!
8//! NOT thread-safe — lives on a single Data Plane core (!Send by design in Origin,
9//! Mutex-wrapped in Lite).
10
11mod column_data;
12
13pub use column_data::{ColumnData, DICT_ENCODE_MAX_CARDINALITY};
14
15use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
16use nodedb_types::value::Value;
17
18use crate::error::ColumnarError;
19
20/// Default flush threshold: 64K rows per memtable.
21pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
22
23/// In-memory columnar buffer that accumulates INSERTs.
24///
25/// Each column is stored as a typed vector. The memtable flushes to a
26/// compressed segment when the row count reaches the threshold.
27pub struct ColumnarMemtable {
28    schema: ColumnarSchema,
29    columns: Vec<ColumnData>,
30    row_count: usize,
31    flush_threshold: usize,
32}
33
34impl ColumnarMemtable {
35    /// Create a new empty memtable for the given schema.
36    pub fn new(schema: &ColumnarSchema) -> Self {
37        Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
38    }
39
40    /// Create with a custom flush threshold.
41    pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
42        let columns = schema
43            .columns
44            .iter()
45            .map(|col| ColumnData::new(&col.column_type, col.nullable))
46            .collect();
47        Self {
48            schema: schema.clone(),
49            columns,
50            row_count: 0,
51            flush_threshold,
52        }
53    }
54
55    /// Append a row of values. Validates types and nullability.
56    pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
57        if values.len() != self.schema.columns.len() {
58            return Err(ColumnarError::SchemaMismatch {
59                expected: self.schema.columns.len(),
60                got: values.len(),
61            });
62        }
63
64        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
65            if matches!(value, Value::Null) && !col_def.nullable {
66                return Err(ColumnarError::NullViolation(col_def.name.clone()));
67            }
68            self.columns[i].push(value, &col_def.name)?;
69        }
70
71        self.row_count += 1;
72        debug_assert!(
73            self.columns.iter().all(|c| c.len() == self.row_count),
74            "column lengths must stay aligned with row_count"
75        );
76        Ok(())
77    }
78
79    /// Number of rows currently buffered.
80    pub fn row_count(&self) -> usize {
81        self.row_count
82    }
83
84    /// Whether the memtable has reached its flush threshold.
85    pub fn should_flush(&self) -> bool {
86        self.row_count >= self.flush_threshold
87    }
88
89    /// Whether the memtable is empty.
90    pub fn is_empty(&self) -> bool {
91        self.row_count == 0
92    }
93
94    /// Access the schema.
95    pub fn schema(&self) -> &ColumnarSchema {
96        &self.schema
97    }
98
99    /// Access the raw column data (for the segment writer).
100    pub fn columns(&self) -> &[ColumnData] {
101        &self.columns
102    }
103
104    /// Convert low-cardinality `String` columns to `DictEncoded` in-place.
105    pub fn try_dict_encode_columns(&mut self, max_cardinality: u32) {
106        for col in &mut self.columns {
107            if let ColumnData::String { .. } = col
108                && let Some(encoded) = ColumnData::try_dict_encode(col, max_cardinality)
109            {
110                *col = encoded;
111            }
112        }
113    }
114
115    /// Iterate rows as `Vec<Value>`. For scan/read operations.
116    pub fn iter_rows(&self) -> MemtableRowIter<'_> {
117        MemtableRowIter {
118            columns: &self.columns,
119            row_count: self.row_count,
120            current: 0,
121        }
122    }
123
124    /// Get a single row by index as `Vec<Value>`.
125    pub fn get_row(&self, row_idx: usize) -> Option<Vec<Value>> {
126        if row_idx >= self.row_count {
127            return None;
128        }
129        let mut row = Vec::with_capacity(self.columns.len());
130        for col in &self.columns {
131            row.push(col.get_value(row_idx));
132        }
133        Some(row)
134    }
135
136    /// Drain the memtable: return all column data and reset to empty.
137    pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
138        let columns = std::mem::replace(
139            &mut self.columns,
140            self.schema
141                .columns
142                .iter()
143                .map(|col| ColumnData::new(&col.column_type, col.nullable))
144                .collect(),
145        );
146        let row_count = self.row_count;
147        self.row_count = 0;
148        (self.schema.clone(), columns, row_count)
149    }
150
151    /// Drain with automatic dictionary encoding for low-cardinality String columns.
152    pub fn drain_optimized(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
153        self.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
154        self.drain()
155    }
156
157    /// Zero-copy row ingest for timeseries and high-throughput paths.
158    ///
159    /// Accepts borrowed values via `IngestValue<'_>`, avoiding string cloning
160    /// for tag columns that are already interned in the `DictEncoded` dictionary.
161    pub fn ingest_row_refs(&mut self, values: &[IngestValue<'_>]) -> Result<(), ColumnarError> {
162        if values.len() != self.schema.columns.len() {
163            return Err(ColumnarError::SchemaMismatch {
164                expected: self.schema.columns.len(),
165                got: values.len(),
166            });
167        }
168
169        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
170            if matches!(value, IngestValue::Null) && !col_def.nullable {
171                return Err(ColumnarError::NullViolation(col_def.name.clone()));
172            }
173            self.columns[i].push_ref(value, &col_def.name)?;
174        }
175
176        self.row_count += 1;
177        Ok(())
178    }
179
180    /// Add a new column to the schema, backfilling existing rows with nulls/defaults.
181    pub fn add_column(&mut self, name: String, column_type: ColumnType, nullable: bool) {
182        if self.schema.columns.iter().any(|c| c.name == name) {
183            return;
184        }
185
186        let existing_rows = self.row_count;
187        let mut col = ColumnData::new(&column_type, nullable);
188        if existing_rows > 0 {
189            col.backfill_nulls(existing_rows);
190        }
191
192        self.columns.push(col);
193        self.schema.columns.push(ColumnDef {
194            name,
195            column_type,
196            nullable,
197            default: None,
198            primary_key: false,
199            modifiers: Vec::new(),
200            generated_expr: None,
201            generated_deps: Vec::new(),
202            added_at_version: 1,
203        });
204    }
205}
206
207/// Borrowed value for zero-copy ingest into the columnar memtable.
208#[derive(Debug, Clone, Copy)]
209pub enum IngestValue<'a> {
210    Null,
211    Int64(i64),
212    Float64(f64),
213    Bool(bool),
214    Timestamp(i64),
215    /// Borrowed string — for `String` or `DictEncoded` columns.
216    Str(&'a str),
217}
218
219/// Row iterator over a columnar memtable.
220pub struct MemtableRowIter<'a> {
221    columns: &'a [ColumnData],
222    row_count: usize,
223    current: usize,
224}
225
226impl Iterator for MemtableRowIter<'_> {
227    type Item = Vec<Value>;
228
229    fn next(&mut self) -> Option<Self::Item> {
230        if self.current >= self.row_count {
231            return None;
232        }
233        let mut row = Vec::with_capacity(self.columns.len());
234        for col in self.columns {
235            row.push(col.get_value(self.current));
236        }
237        self.current += 1;
238        Some(row)
239    }
240
241    fn size_hint(&self) -> (usize, Option<usize>) {
242        let remaining = self.row_count - self.current;
243        (remaining, Some(remaining))
244    }
245}
246
247impl ExactSizeIterator for MemtableRowIter<'_> {}
248
249#[cfg(test)]
250mod tests {
251    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
252
253    use super::*;
254
255    fn test_schema() -> ColumnarSchema {
256        ColumnarSchema::new(vec![
257            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
258            ColumnDef::required("name", ColumnType::String),
259            ColumnDef::nullable("score", ColumnType::Float64),
260        ])
261        .expect("valid schema")
262    }
263
264    #[test]
265    fn append_and_count() {
266        let schema = test_schema();
267        let mut mt = ColumnarMemtable::new(&schema);
268
269        mt.append_row(&[
270            Value::Integer(1),
271            Value::String("Alice".into()),
272            Value::Float(0.75),
273        ])
274        .expect("append");
275
276        mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
277            .expect("append");
278
279        assert_eq!(mt.row_count(), 2);
280        assert!(!mt.is_empty());
281    }
282
283    #[test]
284    fn null_violation_rejected() {
285        let schema = test_schema();
286        let mut mt = ColumnarMemtable::new(&schema);
287
288        let err = mt
289            .append_row(&[Value::Null, Value::String("x".into()), Value::Null])
290            .unwrap_err();
291        assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
292    }
293
294    #[test]
295    fn schema_mismatch_rejected() {
296        let schema = test_schema();
297        let mut mt = ColumnarMemtable::new(&schema);
298
299        let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
300        assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
301    }
302
303    #[test]
304    fn flush_threshold() {
305        let schema = test_schema();
306        let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
307
308        for i in 0..2 {
309            mt.append_row(&[
310                Value::Integer(i),
311                Value::String(format!("u{i}")),
312                Value::Null,
313            ])
314            .expect("append");
315        }
316        assert!(!mt.should_flush());
317
318        mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
319            .expect("append");
320        assert!(mt.should_flush());
321    }
322
323    #[test]
324    fn drain_resets() {
325        let schema = test_schema();
326        let mut mt = ColumnarMemtable::new(&schema);
327
328        mt.append_row(&[
329            Value::Integer(1),
330            Value::String("x".into()),
331            Value::Float(0.5),
332        ])
333        .expect("append");
334
335        let (_schema, columns, row_count) = mt.drain();
336        assert_eq!(row_count, 1);
337        assert_eq!(columns.len(), 3);
338        assert_eq!(mt.row_count(), 0);
339        assert!(mt.is_empty());
340
341        match &columns[0] {
342            ColumnData::Int64 { values, valid } => {
343                assert_eq!(values, &[1]);
344                assert!(valid.is_none());
345            }
346            _ => panic!("expected Int64"),
347        }
348        match &columns[1] {
349            ColumnData::String {
350                data,
351                offsets,
352                valid,
353            } => {
354                assert_eq!(std::str::from_utf8(data).unwrap(), "x");
355                assert_eq!(offsets, &[0, 1]);
356                assert!(valid.is_none());
357            }
358            _ => panic!("expected String"),
359        }
360    }
361
362    #[test]
363    fn all_types() {
364        let schema = ColumnarSchema::new(vec![
365            ColumnDef::required("i", ColumnType::Int64),
366            ColumnDef::required("f", ColumnType::Float64),
367            ColumnDef::required("b", ColumnType::Bool),
368            ColumnDef::required("ts", ColumnType::Timestamp),
369            ColumnDef::required("s", ColumnType::String),
370            ColumnDef::required("raw", ColumnType::Bytes),
371            ColumnDef::required("vec", ColumnType::Vector(3)),
372        ])
373        .expect("valid");
374
375        let mut mt = ColumnarMemtable::new(&schema);
376        mt.append_row(&[
377            Value::Integer(42),
378            Value::Float(0.25),
379            Value::Bool(true),
380            Value::Integer(1_700_000_000),
381            Value::String("hello".into()),
382            Value::Bytes(vec![0xDE, 0xAD]),
383            Value::Array(vec![
384                Value::Float(1.0),
385                Value::Float(2.0),
386                Value::Float(3.0),
387            ]),
388        ])
389        .expect("append all types");
390
391        assert_eq!(mt.row_count(), 1);
392    }
393
394    #[test]
395    fn dict_encode_low_cardinality() {
396        let schema = ColumnarSchema::new(vec![ColumnDef::required("qtype", ColumnType::String)])
397            .expect("valid");
398
399        let mut mt = ColumnarMemtable::new(&schema);
400        let qtypes = ["A", "B", "AAAA", "NS", "MX", "SOA", "CNAME", "PTR"];
401        for _ in 0..10 {
402            for &q in &qtypes {
403                mt.append_row(&[Value::String(q.into())]).expect("append");
404            }
405        }
406        assert_eq!(mt.row_count(), 80);
407
408        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
409
410        let (_schema, columns, _row_count) = mt.drain();
411        match &columns[0] {
412            ColumnData::DictEncoded {
413                ids,
414                dictionary,
415                valid,
416                ..
417            } => {
418                assert_eq!(ids.len(), 80);
419                assert!(valid.is_none());
420                assert_eq!(dictionary.len(), 8);
421                for &id in ids {
422                    assert!((id as usize) < dictionary.len());
423                }
424                for (i, &q) in qtypes.iter().enumerate().take(8) {
425                    let expected_id = dictionary.iter().position(|s| s == q).expect("in dict");
426                    assert_eq!(ids[i], expected_id as u32);
427                }
428            }
429            _ => panic!("expected DictEncoded after try_dict_encode_columns"),
430        }
431    }
432
433    #[test]
434    fn dict_encode_exceeds_cardinality_stays_string() {
435        let schema = ColumnarSchema::new(vec![ColumnDef::required("name", ColumnType::String)])
436            .expect("valid");
437
438        let mut mt = ColumnarMemtable::new(&schema);
439        let max: u32 = 4;
440        for i in 0..=max {
441            mt.append_row(&[Value::String(format!("val_{i}"))])
442                .expect("append");
443        }
444
445        mt.try_dict_encode_columns(max);
446
447        let (_schema, columns, _row_count) = mt.drain();
448        assert!(matches!(columns[0], ColumnData::String { .. }));
449    }
450
451    #[test]
452    fn dict_encode_with_nulls() {
453        let schema = ColumnarSchema::new(vec![ColumnDef::nullable("tag", ColumnType::String)])
454            .expect("valid");
455
456        let mut mt = ColumnarMemtable::new(&schema);
457        mt.append_row(&[Value::String("foo".into())])
458            .expect("append");
459        mt.append_row(&[Value::Null]).expect("append null");
460        mt.append_row(&[Value::String("bar".into())])
461            .expect("append");
462        mt.append_row(&[Value::Null]).expect("append null");
463
464        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
465
466        let (_schema, columns, _row_count) = mt.drain();
467        match &columns[0] {
468            ColumnData::DictEncoded {
469                ids,
470                valid,
471                dictionary,
472                ..
473            } => {
474                assert_eq!(ids.len(), 4);
475                let v = valid.as_ref().expect("nullable column has validity bitmap");
476                assert_eq!(v.len(), 4);
477                assert!(v[0]);
478                assert!(!v[1]);
479                assert!(v[2]);
480                assert!(!v[3]);
481                assert_eq!(dictionary.len(), 2);
482            }
483            _ => panic!("expected DictEncoded"),
484        }
485    }
486}