Skip to main content

nodedb_columnar/memtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Columnar memtable: in-memory row buffer with typed column vectors.
4//!
5//! Each column is stored as a typed vector (Vec<i64>, Vec<f64>, etc.) rather
6//! than Vec<Value> to avoid enum overhead and enable SIMD-friendly memory layout.
7//! The memtable accumulates INSERTs and flushes to a segment when the row count
8//! reaches the configured threshold.
9//!
10//! NOT thread-safe — lives on a single Data Plane core (!Send by design in Origin,
11//! Mutex-wrapped in Lite).
12
13mod column_data;
14
15pub use column_data::{ColumnData, DICT_ENCODE_MAX_CARDINALITY};
16
17use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
18use nodedb_types::value::Value;
19
20use crate::error::ColumnarError;
21
22/// Default flush threshold: 64K rows per memtable.
23pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
24
25/// In-memory columnar buffer that accumulates INSERTs.
26///
27/// Each column is stored as a typed vector. The memtable flushes to a
28/// compressed segment when the row count reaches the threshold.
29pub struct ColumnarMemtable {
30    schema: ColumnarSchema,
31    columns: Vec<ColumnData>,
32    row_count: usize,
33    flush_threshold: usize,
34}
35
36impl ColumnarMemtable {
37    /// Create a new empty memtable for the given schema.
38    pub fn new(schema: &ColumnarSchema) -> Self {
39        Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
40    }
41
42    /// Create with a custom flush threshold.
43    pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
44        let columns = schema
45            .columns
46            .iter()
47            .map(|col| ColumnData::new(&col.column_type, col.nullable))
48            .collect();
49        Self {
50            schema: schema.clone(),
51            columns,
52            row_count: 0,
53            flush_threshold,
54        }
55    }
56
57    /// Append a row of values. Validates types and nullability.
58    pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
59        if values.len() != self.schema.columns.len() {
60            return Err(ColumnarError::SchemaMismatch {
61                expected: self.schema.columns.len(),
62                got: values.len(),
63            });
64        }
65
66        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
67            if matches!(value, Value::Null) && !col_def.nullable {
68                return Err(ColumnarError::NullViolation(col_def.name.clone()));
69            }
70            self.columns[i].push(value, &col_def.name)?;
71        }
72
73        self.row_count += 1;
74        debug_assert!(
75            self.columns.iter().all(|c| c.len() == self.row_count),
76            "column lengths must stay aligned with row_count"
77        );
78        Ok(())
79    }
80
81    /// Number of rows currently buffered.
82    pub fn row_count(&self) -> usize {
83        self.row_count
84    }
85
86    /// Whether the memtable has reached its flush threshold.
87    pub fn should_flush(&self) -> bool {
88        self.row_count >= self.flush_threshold
89    }
90
91    /// Whether the memtable is empty.
92    pub fn is_empty(&self) -> bool {
93        self.row_count == 0
94    }
95
96    /// Access the schema.
97    pub fn schema(&self) -> &ColumnarSchema {
98        &self.schema
99    }
100
101    /// Access the raw column data (for the segment writer).
102    pub fn columns(&self) -> &[ColumnData] {
103        &self.columns
104    }
105
106    /// Convert low-cardinality `String` columns to `DictEncoded` in-place.
107    pub fn try_dict_encode_columns(&mut self, max_cardinality: u32) {
108        for col in &mut self.columns {
109            if let ColumnData::String { .. } = col
110                && let Some(encoded) = ColumnData::try_dict_encode(col, max_cardinality)
111            {
112                *col = encoded;
113            }
114        }
115    }
116
117    /// Iterate rows as `Vec<Value>`. For scan/read operations.
118    pub fn iter_rows(&self) -> MemtableRowIter<'_> {
119        MemtableRowIter {
120            columns: &self.columns,
121            row_count: self.row_count,
122            current: 0,
123        }
124    }
125
126    /// Get a single row by index as `Vec<Value>`.
127    pub fn get_row(&self, row_idx: usize) -> Option<Vec<Value>> {
128        if row_idx >= self.row_count {
129            return None;
130        }
131        // no-governor: hot-path per-row fetch; bounded by column count (schema width, typically small)
132        let mut row = Vec::with_capacity(self.columns.len());
133        for col in &self.columns {
134            row.push(col.get_value(row_idx));
135        }
136        Some(row)
137    }
138
139    /// Truncate the memtable to `n` rows, discarding all rows beyond that point.
140    ///
141    /// Used by transaction rollback to restore the memtable to its pre-write state.
142    /// No-op if `n >= self.row_count`.
143    pub fn truncate_to(&mut self, n: usize) {
144        if n >= self.row_count {
145            return;
146        }
147        for col in &mut self.columns {
148            col.truncate(n);
149        }
150        self.row_count = n;
151        debug_assert!(
152            self.columns.iter().all(|c| c.len() == self.row_count),
153            "column lengths misaligned after truncate_to"
154        );
155    }
156
157    /// Drain the memtable: return all column data and reset to empty.
158    pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
159        let columns = std::mem::replace(
160            &mut self.columns,
161            self.schema
162                .columns
163                .iter()
164                .map(|col| ColumnData::new(&col.column_type, col.nullable))
165                .collect(),
166        );
167        let row_count = self.row_count;
168        self.row_count = 0;
169        (self.schema.clone(), columns, row_count)
170    }
171
172    /// Drain with automatic dictionary encoding for low-cardinality String columns.
173    pub fn drain_optimized(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
174        self.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
175        self.drain()
176    }
177
178    /// Zero-copy row ingest for timeseries and high-throughput paths.
179    ///
180    /// Accepts borrowed values via `IngestValue<'_>`, avoiding string cloning
181    /// for tag columns that are already interned in the `DictEncoded` dictionary.
182    pub fn ingest_row_refs(&mut self, values: &[IngestValue<'_>]) -> Result<(), ColumnarError> {
183        if values.len() != self.schema.columns.len() {
184            return Err(ColumnarError::SchemaMismatch {
185                expected: self.schema.columns.len(),
186                got: values.len(),
187            });
188        }
189
190        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
191            if matches!(value, IngestValue::Null) && !col_def.nullable {
192                return Err(ColumnarError::NullViolation(col_def.name.clone()));
193            }
194            self.columns[i].push_ref(value, &col_def.name)?;
195        }
196
197        self.row_count += 1;
198        Ok(())
199    }
200
201    /// Add a new column to the schema, backfilling existing rows with nulls/defaults.
202    pub fn add_column(&mut self, name: String, column_type: ColumnType, nullable: bool) {
203        if self.schema.columns.iter().any(|c| c.name == name) {
204            return;
205        }
206
207        let existing_rows = self.row_count;
208        let mut col = ColumnData::new(&column_type, nullable);
209        if existing_rows > 0 {
210            col.backfill_nulls(existing_rows);
211        }
212
213        self.columns.push(col);
214        let col_def = if nullable {
215            ColumnDef::nullable(name, column_type)
216        } else {
217            ColumnDef::required(name, column_type)
218        };
219        self.schema.columns.push(col_def);
220    }
221}
222
223/// Borrowed value for zero-copy ingest into the columnar memtable.
224#[derive(Debug, Clone, Copy)]
225#[non_exhaustive]
226pub enum IngestValue<'a> {
227    Null,
228    Int64(i64),
229    Float64(f64),
230    Bool(bool),
231    Timestamp(i64),
232    /// Borrowed string — for `String` or `DictEncoded` columns.
233    Str(&'a str),
234}
235
236/// Row iterator over a columnar memtable.
237pub struct MemtableRowIter<'a> {
238    columns: &'a [ColumnData],
239    row_count: usize,
240    current: usize,
241}
242
243impl Iterator for MemtableRowIter<'_> {
244    type Item = Vec<Value>;
245
246    fn next(&mut self) -> Option<Self::Item> {
247        if self.current >= self.row_count {
248            return None;
249        }
250        // no-governor: hot-path Iterator::next per-row; bounded by column count (schema width, typically small)
251        let mut row = Vec::with_capacity(self.columns.len());
252        for col in self.columns {
253            row.push(col.get_value(self.current));
254        }
255        self.current += 1;
256        Some(row)
257    }
258
259    fn size_hint(&self) -> (usize, Option<usize>) {
260        let remaining = self.row_count - self.current;
261        (remaining, Some(remaining))
262    }
263}
264
265impl ExactSizeIterator for MemtableRowIter<'_> {}
266
267#[cfg(test)]
268mod tests {
269    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
270
271    use super::*;
272
273    fn test_schema() -> ColumnarSchema {
274        ColumnarSchema::new(vec![
275            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
276            ColumnDef::required("name", ColumnType::String),
277            ColumnDef::nullable("score", ColumnType::Float64),
278        ])
279        .expect("valid schema")
280    }
281
282    #[test]
283    fn append_and_count() {
284        let schema = test_schema();
285        let mut mt = ColumnarMemtable::new(&schema);
286
287        mt.append_row(&[
288            Value::Integer(1),
289            Value::String("Alice".into()),
290            Value::Float(0.75),
291        ])
292        .expect("append");
293
294        mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
295            .expect("append");
296
297        assert_eq!(mt.row_count(), 2);
298        assert!(!mt.is_empty());
299    }
300
301    #[test]
302    fn null_violation_rejected() {
303        let schema = test_schema();
304        let mut mt = ColumnarMemtable::new(&schema);
305
306        let err = mt
307            .append_row(&[Value::Null, Value::String("x".into()), Value::Null])
308            .unwrap_err();
309        assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
310    }
311
312    #[test]
313    fn schema_mismatch_rejected() {
314        let schema = test_schema();
315        let mut mt = ColumnarMemtable::new(&schema);
316
317        let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
318        assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
319    }
320
321    #[test]
322    fn flush_threshold() {
323        let schema = test_schema();
324        let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
325
326        for i in 0..2 {
327            mt.append_row(&[
328                Value::Integer(i),
329                Value::String(format!("u{i}")),
330                Value::Null,
331            ])
332            .expect("append");
333        }
334        assert!(!mt.should_flush());
335
336        mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
337            .expect("append");
338        assert!(mt.should_flush());
339    }
340
341    #[test]
342    fn drain_resets() {
343        let schema = test_schema();
344        let mut mt = ColumnarMemtable::new(&schema);
345
346        mt.append_row(&[
347            Value::Integer(1),
348            Value::String("x".into()),
349            Value::Float(0.5),
350        ])
351        .expect("append");
352
353        let (_schema, columns, row_count) = mt.drain();
354        assert_eq!(row_count, 1);
355        assert_eq!(columns.len(), 3);
356        assert_eq!(mt.row_count(), 0);
357        assert!(mt.is_empty());
358
359        match &columns[0] {
360            ColumnData::Int64 { values, valid } => {
361                assert_eq!(values, &[1]);
362                assert!(valid.is_none());
363            }
364            _ => panic!("expected Int64"),
365        }
366        match &columns[1] {
367            ColumnData::String {
368                data,
369                offsets,
370                valid,
371            } => {
372                assert_eq!(std::str::from_utf8(data).unwrap(), "x");
373                assert_eq!(offsets, &[0, 1]);
374                assert!(valid.is_none());
375            }
376            _ => panic!("expected String"),
377        }
378    }
379
380    #[test]
381    fn all_types() {
382        let schema = ColumnarSchema::new(vec![
383            ColumnDef::required("i", ColumnType::Int64),
384            ColumnDef::required("f", ColumnType::Float64),
385            ColumnDef::required("b", ColumnType::Bool),
386            ColumnDef::required("ts", ColumnType::Timestamp),
387            ColumnDef::required("s", ColumnType::String),
388            ColumnDef::required("raw", ColumnType::Bytes),
389            ColumnDef::required("vec", ColumnType::Vector(3)),
390        ])
391        .expect("valid");
392
393        let mut mt = ColumnarMemtable::new(&schema);
394        mt.append_row(&[
395            Value::Integer(42),
396            Value::Float(0.25),
397            Value::Bool(true),
398            Value::Integer(1_700_000_000),
399            Value::String("hello".into()),
400            Value::Bytes(vec![0xDE, 0xAD]),
401            Value::Array(vec![
402                Value::Float(1.0),
403                Value::Float(2.0),
404                Value::Float(3.0),
405            ]),
406        ])
407        .expect("append all types");
408
409        assert_eq!(mt.row_count(), 1);
410    }
411
412    #[test]
413    fn dict_encode_low_cardinality() {
414        let schema = ColumnarSchema::new(vec![ColumnDef::required("qtype", ColumnType::String)])
415            .expect("valid");
416
417        let mut mt = ColumnarMemtable::new(&schema);
418        let qtypes = ["A", "B", "AAAA", "NS", "MX", "SOA", "CNAME", "PTR"];
419        for _ in 0..10 {
420            for &q in &qtypes {
421                mt.append_row(&[Value::String(q.into())]).expect("append");
422            }
423        }
424        assert_eq!(mt.row_count(), 80);
425
426        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
427
428        let (_schema, columns, _row_count) = mt.drain();
429        match &columns[0] {
430            ColumnData::DictEncoded {
431                ids,
432                dictionary,
433                valid,
434                ..
435            } => {
436                assert_eq!(ids.len(), 80);
437                assert!(valid.is_none());
438                assert_eq!(dictionary.len(), 8);
439                for &id in ids {
440                    assert!((id as usize) < dictionary.len());
441                }
442                for (i, &q) in qtypes.iter().enumerate().take(8) {
443                    let expected_id = dictionary.iter().position(|s| s == q).expect("in dict");
444                    assert_eq!(ids[i], expected_id as u32);
445                }
446            }
447            _ => panic!("expected DictEncoded after try_dict_encode_columns"),
448        }
449    }
450
451    #[test]
452    fn dict_encode_exceeds_cardinality_stays_string() {
453        let schema = ColumnarSchema::new(vec![ColumnDef::required("name", ColumnType::String)])
454            .expect("valid");
455
456        let mut mt = ColumnarMemtable::new(&schema);
457        let max: u32 = 4;
458        for i in 0..=max {
459            mt.append_row(&[Value::String(format!("val_{i}"))])
460                .expect("append");
461        }
462
463        mt.try_dict_encode_columns(max);
464
465        let (_schema, columns, _row_count) = mt.drain();
466        assert!(matches!(columns[0], ColumnData::String { .. }));
467    }
468
469    #[test]
470    fn dict_encode_with_nulls() {
471        let schema = ColumnarSchema::new(vec![ColumnDef::nullable("tag", ColumnType::String)])
472            .expect("valid");
473
474        let mut mt = ColumnarMemtable::new(&schema);
475        mt.append_row(&[Value::String("foo".into())])
476            .expect("append");
477        mt.append_row(&[Value::Null]).expect("append null");
478        mt.append_row(&[Value::String("bar".into())])
479            .expect("append");
480        mt.append_row(&[Value::Null]).expect("append null");
481
482        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
483
484        let (_schema, columns, _row_count) = mt.drain();
485        match &columns[0] {
486            ColumnData::DictEncoded {
487                ids,
488                valid,
489                dictionary,
490                ..
491            } => {
492                assert_eq!(ids.len(), 4);
493                let v = valid.as_ref().expect("nullable column has validity bitmap");
494                assert_eq!(v.len(), 4);
495                assert!(v[0]);
496                assert!(!v[1]);
497                assert!(v[2]);
498                assert!(!v[3]);
499                assert_eq!(dictionary.len(), 2);
500            }
501            _ => panic!("expected DictEncoded"),
502        }
503    }
504}