Skip to main content

nodedb_columnar/memtable/
mod.rs

1//! Columnar memtable: in-memory row buffer with typed column vectors.
2//!
3//! Each column is stored as a typed vector (Vec<i64>, Vec<f64>, etc.) rather
4//! than Vec<Value> to avoid enum overhead and enable SIMD-friendly memory layout.
5//! The memtable accumulates INSERTs and flushes to a segment when the row count
6//! reaches the configured threshold.
7//!
8//! NOT thread-safe — lives on a single Data Plane core (!Send by design in Origin,
9//! Mutex-wrapped in Lite).
10
11mod column_data;
12
13pub use column_data::{ColumnData, DICT_ENCODE_MAX_CARDINALITY};
14
15use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
16use nodedb_types::value::Value;
17
18use crate::error::ColumnarError;
19
20/// Default flush threshold: 64K rows per memtable.
21pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
22
23/// In-memory columnar buffer that accumulates INSERTs.
24///
25/// Each column is stored as a typed vector. The memtable flushes to a
26/// compressed segment when the row count reaches the threshold.
27pub struct ColumnarMemtable {
28    schema: ColumnarSchema,
29    columns: Vec<ColumnData>,
30    row_count: usize,
31    flush_threshold: usize,
32}
33
34impl ColumnarMemtable {
35    /// Create a new empty memtable for the given schema.
36    pub fn new(schema: &ColumnarSchema) -> Self {
37        Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
38    }
39
40    /// Create with a custom flush threshold.
41    pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
42        let columns = schema
43            .columns
44            .iter()
45            .map(|col| ColumnData::new(&col.column_type, col.nullable))
46            .collect();
47        Self {
48            schema: schema.clone(),
49            columns,
50            row_count: 0,
51            flush_threshold,
52        }
53    }
54
55    /// Append a row of values. Validates types and nullability.
56    pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
57        if values.len() != self.schema.columns.len() {
58            return Err(ColumnarError::SchemaMismatch {
59                expected: self.schema.columns.len(),
60                got: values.len(),
61            });
62        }
63
64        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
65            if matches!(value, Value::Null) && !col_def.nullable {
66                return Err(ColumnarError::NullViolation(col_def.name.clone()));
67            }
68            self.columns[i].push(value, &col_def.name)?;
69        }
70
71        self.row_count += 1;
72        debug_assert!(
73            self.columns.iter().all(|c| c.len() == self.row_count),
74            "column lengths must stay aligned with row_count"
75        );
76        Ok(())
77    }
78
79    /// Number of rows currently buffered.
80    pub fn row_count(&self) -> usize {
81        self.row_count
82    }
83
84    /// Whether the memtable has reached its flush threshold.
85    pub fn should_flush(&self) -> bool {
86        self.row_count >= self.flush_threshold
87    }
88
89    /// Whether the memtable is empty.
90    pub fn is_empty(&self) -> bool {
91        self.row_count == 0
92    }
93
94    /// Access the schema.
95    pub fn schema(&self) -> &ColumnarSchema {
96        &self.schema
97    }
98
99    /// Access the raw column data (for the segment writer).
100    pub fn columns(&self) -> &[ColumnData] {
101        &self.columns
102    }
103
104    /// Convert low-cardinality `String` columns to `DictEncoded` in-place.
105    pub fn try_dict_encode_columns(&mut self, max_cardinality: u32) {
106        for col in &mut self.columns {
107            if let ColumnData::String { .. } = col
108                && let Some(encoded) = ColumnData::try_dict_encode(col, max_cardinality)
109            {
110                *col = encoded;
111            }
112        }
113    }
114
115    /// Iterate rows as `Vec<Value>`. For scan/read operations.
116    pub fn iter_rows(&self) -> MemtableRowIter<'_> {
117        MemtableRowIter {
118            columns: &self.columns,
119            row_count: self.row_count,
120            current: 0,
121        }
122    }
123
124    /// Get a single row by index as `Vec<Value>`.
125    pub fn get_row(&self, row_idx: usize) -> Option<Vec<Value>> {
126        if row_idx >= self.row_count {
127            return None;
128        }
129        let mut row = Vec::with_capacity(self.columns.len());
130        for col in &self.columns {
131            row.push(col.get_value(row_idx));
132        }
133        Some(row)
134    }
135
136    /// Drain the memtable: return all column data and reset to empty.
137    pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
138        let columns = std::mem::replace(
139            &mut self.columns,
140            self.schema
141                .columns
142                .iter()
143                .map(|col| ColumnData::new(&col.column_type, col.nullable))
144                .collect(),
145        );
146        let row_count = self.row_count;
147        self.row_count = 0;
148        (self.schema.clone(), columns, row_count)
149    }
150
151    /// Drain with automatic dictionary encoding for low-cardinality String columns.
152    pub fn drain_optimized(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
153        self.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
154        self.drain()
155    }
156
157    /// Zero-copy row ingest for timeseries and high-throughput paths.
158    ///
159    /// Accepts borrowed values via `IngestValue<'_>`, avoiding string cloning
160    /// for tag columns that are already interned in the `DictEncoded` dictionary.
161    pub fn ingest_row_refs(&mut self, values: &[IngestValue<'_>]) -> Result<(), ColumnarError> {
162        if values.len() != self.schema.columns.len() {
163            return Err(ColumnarError::SchemaMismatch {
164                expected: self.schema.columns.len(),
165                got: values.len(),
166            });
167        }
168
169        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
170            if matches!(value, IngestValue::Null) && !col_def.nullable {
171                return Err(ColumnarError::NullViolation(col_def.name.clone()));
172            }
173            self.columns[i].push_ref(value, &col_def.name)?;
174        }
175
176        self.row_count += 1;
177        Ok(())
178    }
179
180    /// Add a new column to the schema, backfilling existing rows with nulls/defaults.
181    pub fn add_column(&mut self, name: String, column_type: ColumnType, nullable: bool) {
182        if self.schema.columns.iter().any(|c| c.name == name) {
183            return;
184        }
185
186        let existing_rows = self.row_count;
187        let mut col = ColumnData::new(&column_type, nullable);
188        if existing_rows > 0 {
189            col.backfill_nulls(existing_rows);
190        }
191
192        self.columns.push(col);
193        self.schema.columns.push(ColumnDef {
194            name,
195            column_type,
196            nullable,
197            default: None,
198            primary_key: false,
199            modifiers: Vec::new(),
200            generated_expr: None,
201            generated_deps: Vec::new(),
202        });
203    }
204}
205
206/// Borrowed value for zero-copy ingest into the columnar memtable.
207#[derive(Debug, Clone, Copy)]
208pub enum IngestValue<'a> {
209    Null,
210    Int64(i64),
211    Float64(f64),
212    Bool(bool),
213    Timestamp(i64),
214    /// Borrowed string — for `String` or `DictEncoded` columns.
215    Str(&'a str),
216}
217
218/// Row iterator over a columnar memtable.
219pub struct MemtableRowIter<'a> {
220    columns: &'a [ColumnData],
221    row_count: usize,
222    current: usize,
223}
224
225impl Iterator for MemtableRowIter<'_> {
226    type Item = Vec<Value>;
227
228    fn next(&mut self) -> Option<Self::Item> {
229        if self.current >= self.row_count {
230            return None;
231        }
232        let mut row = Vec::with_capacity(self.columns.len());
233        for col in self.columns {
234            row.push(col.get_value(self.current));
235        }
236        self.current += 1;
237        Some(row)
238    }
239
240    fn size_hint(&self) -> (usize, Option<usize>) {
241        let remaining = self.row_count - self.current;
242        (remaining, Some(remaining))
243    }
244}
245
246impl ExactSizeIterator for MemtableRowIter<'_> {}
247
248#[cfg(test)]
249mod tests {
250    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
251
252    use super::*;
253
254    fn test_schema() -> ColumnarSchema {
255        ColumnarSchema::new(vec![
256            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
257            ColumnDef::required("name", ColumnType::String),
258            ColumnDef::nullable("score", ColumnType::Float64),
259        ])
260        .expect("valid schema")
261    }
262
263    #[test]
264    fn append_and_count() {
265        let schema = test_schema();
266        let mut mt = ColumnarMemtable::new(&schema);
267
268        mt.append_row(&[
269            Value::Integer(1),
270            Value::String("Alice".into()),
271            Value::Float(0.75),
272        ])
273        .expect("append");
274
275        mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
276            .expect("append");
277
278        assert_eq!(mt.row_count(), 2);
279        assert!(!mt.is_empty());
280    }
281
282    #[test]
283    fn null_violation_rejected() {
284        let schema = test_schema();
285        let mut mt = ColumnarMemtable::new(&schema);
286
287        let err = mt
288            .append_row(&[Value::Null, Value::String("x".into()), Value::Null])
289            .unwrap_err();
290        assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
291    }
292
293    #[test]
294    fn schema_mismatch_rejected() {
295        let schema = test_schema();
296        let mut mt = ColumnarMemtable::new(&schema);
297
298        let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
299        assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
300    }
301
302    #[test]
303    fn flush_threshold() {
304        let schema = test_schema();
305        let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
306
307        for i in 0..2 {
308            mt.append_row(&[
309                Value::Integer(i),
310                Value::String(format!("u{i}")),
311                Value::Null,
312            ])
313            .expect("append");
314        }
315        assert!(!mt.should_flush());
316
317        mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
318            .expect("append");
319        assert!(mt.should_flush());
320    }
321
322    #[test]
323    fn drain_resets() {
324        let schema = test_schema();
325        let mut mt = ColumnarMemtable::new(&schema);
326
327        mt.append_row(&[
328            Value::Integer(1),
329            Value::String("x".into()),
330            Value::Float(0.5),
331        ])
332        .expect("append");
333
334        let (_schema, columns, row_count) = mt.drain();
335        assert_eq!(row_count, 1);
336        assert_eq!(columns.len(), 3);
337        assert_eq!(mt.row_count(), 0);
338        assert!(mt.is_empty());
339
340        match &columns[0] {
341            ColumnData::Int64 { values, valid } => {
342                assert_eq!(values, &[1]);
343                assert!(valid.is_none());
344            }
345            _ => panic!("expected Int64"),
346        }
347        match &columns[1] {
348            ColumnData::String {
349                data,
350                offsets,
351                valid,
352            } => {
353                assert_eq!(std::str::from_utf8(data).unwrap(), "x");
354                assert_eq!(offsets, &[0, 1]);
355                assert!(valid.is_none());
356            }
357            _ => panic!("expected String"),
358        }
359    }
360
361    #[test]
362    fn all_types() {
363        let schema = ColumnarSchema::new(vec![
364            ColumnDef::required("i", ColumnType::Int64),
365            ColumnDef::required("f", ColumnType::Float64),
366            ColumnDef::required("b", ColumnType::Bool),
367            ColumnDef::required("ts", ColumnType::Timestamp),
368            ColumnDef::required("s", ColumnType::String),
369            ColumnDef::required("raw", ColumnType::Bytes),
370            ColumnDef::required("vec", ColumnType::Vector(3)),
371        ])
372        .expect("valid");
373
374        let mut mt = ColumnarMemtable::new(&schema);
375        mt.append_row(&[
376            Value::Integer(42),
377            Value::Float(0.25),
378            Value::Bool(true),
379            Value::Integer(1_700_000_000),
380            Value::String("hello".into()),
381            Value::Bytes(vec![0xDE, 0xAD]),
382            Value::Array(vec![
383                Value::Float(1.0),
384                Value::Float(2.0),
385                Value::Float(3.0),
386            ]),
387        ])
388        .expect("append all types");
389
390        assert_eq!(mt.row_count(), 1);
391    }
392
393    #[test]
394    fn dict_encode_low_cardinality() {
395        let schema = ColumnarSchema::new(vec![ColumnDef::required("qtype", ColumnType::String)])
396            .expect("valid");
397
398        let mut mt = ColumnarMemtable::new(&schema);
399        let qtypes = ["A", "B", "AAAA", "NS", "MX", "SOA", "CNAME", "PTR"];
400        for _ in 0..10 {
401            for &q in &qtypes {
402                mt.append_row(&[Value::String(q.into())]).expect("append");
403            }
404        }
405        assert_eq!(mt.row_count(), 80);
406
407        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
408
409        let (_schema, columns, _row_count) = mt.drain();
410        match &columns[0] {
411            ColumnData::DictEncoded {
412                ids,
413                dictionary,
414                valid,
415                ..
416            } => {
417                assert_eq!(ids.len(), 80);
418                assert!(valid.is_none());
419                assert_eq!(dictionary.len(), 8);
420                for &id in ids {
421                    assert!((id as usize) < dictionary.len());
422                }
423                for (i, &q) in qtypes.iter().enumerate().take(8) {
424                    let expected_id = dictionary.iter().position(|s| s == q).expect("in dict");
425                    assert_eq!(ids[i], expected_id as u32);
426                }
427            }
428            _ => panic!("expected DictEncoded after try_dict_encode_columns"),
429        }
430    }
431
432    #[test]
433    fn dict_encode_exceeds_cardinality_stays_string() {
434        let schema = ColumnarSchema::new(vec![ColumnDef::required("name", ColumnType::String)])
435            .expect("valid");
436
437        let mut mt = ColumnarMemtable::new(&schema);
438        let max: u32 = 4;
439        for i in 0..=max {
440            mt.append_row(&[Value::String(format!("val_{i}"))])
441                .expect("append");
442        }
443
444        mt.try_dict_encode_columns(max);
445
446        let (_schema, columns, _row_count) = mt.drain();
447        assert!(matches!(columns[0], ColumnData::String { .. }));
448    }
449
450    #[test]
451    fn dict_encode_with_nulls() {
452        let schema = ColumnarSchema::new(vec![ColumnDef::nullable("tag", ColumnType::String)])
453            .expect("valid");
454
455        let mut mt = ColumnarMemtable::new(&schema);
456        mt.append_row(&[Value::String("foo".into())])
457            .expect("append");
458        mt.append_row(&[Value::Null]).expect("append null");
459        mt.append_row(&[Value::String("bar".into())])
460            .expect("append");
461        mt.append_row(&[Value::Null]).expect("append null");
462
463        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
464
465        let (_schema, columns, _row_count) = mt.drain();
466        match &columns[0] {
467            ColumnData::DictEncoded {
468                ids,
469                valid,
470                dictionary,
471                ..
472            } => {
473                assert_eq!(ids.len(), 4);
474                let v = valid.as_ref().expect("nullable column has validity bitmap");
475                assert_eq!(v.len(), 4);
476                assert!(v[0]);
477                assert!(!v[1]);
478                assert!(v[2]);
479                assert!(!v[3]);
480                assert_eq!(dictionary.len(), 2);
481            }
482            _ => panic!("expected DictEncoded"),
483        }
484    }
485}