Skip to main content

nodedb_columnar/
memtable.rs

1//! Columnar memtable: in-memory row buffer with typed column vectors.
2//!
3//! Each column is stored as a typed vector (Vec<i64>, Vec<f64>, etc.) rather
4//! than Vec<Value> to avoid enum overhead and enable SIMD-friendly memory layout.
5//! The memtable accumulates INSERTs and flushes to a segment when the row count
6//! reaches the configured threshold.
7//!
8//! NOT thread-safe — lives on a single Data Plane core (!Send by design in Origin,
9//! Mutex-wrapped in Lite).
10
11use nodedb_types::columnar::{ColumnType, ColumnarSchema};
12use nodedb_types::value::Value;
13
14use crate::error::ColumnarError;
15
16/// Default flush threshold: 64K rows per memtable.
17///
18/// Corresponds to `QueryTuning::columnar_flush_threshold`.
19pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
20
21/// A single column's data in the memtable.
22///
23/// Each variant stores a contiguous Vec of the appropriate primitive type
24/// plus a validity bitmap (true = present, false = null). This avoids
25/// Option<T> overhead and enables direct handoff to codec pipelines.
26#[derive(Debug, Clone)]
27pub enum ColumnData {
28    Int64 {
29        values: Vec<i64>,
30        valid: Vec<bool>,
31    },
32    Float64 {
33        values: Vec<f64>,
34        valid: Vec<bool>,
35    },
36    Bool {
37        values: Vec<bool>,
38        valid: Vec<bool>,
39    },
40    Timestamp {
41        values: Vec<i64>,
42        valid: Vec<bool>,
43    },
44    Decimal {
45        /// Stored as 16-byte serialized representations.
46        values: Vec<[u8; 16]>,
47        valid: Vec<bool>,
48    },
49    Uuid {
50        /// Stored as 16-byte binary representations.
51        values: Vec<[u8; 16]>,
52        valid: Vec<bool>,
53    },
54    String {
55        /// Concatenated string bytes.
56        data: Vec<u8>,
57        /// Byte offsets: offset[i] is the start of string i, offset[len] is end sentinel.
58        offsets: Vec<u32>,
59        valid: Vec<bool>,
60    },
61    Bytes {
62        data: Vec<u8>,
63        offsets: Vec<u32>,
64        valid: Vec<bool>,
65    },
66    Geometry {
67        /// Stored as JSON-serialized geometry bytes.
68        data: Vec<u8>,
69        offsets: Vec<u32>,
70        valid: Vec<bool>,
71    },
72    Vector {
73        /// Packed f32 values: dim floats per row.
74        data: Vec<f32>,
75        dim: u32,
76        valid: Vec<bool>,
77    },
78}
79
80impl ColumnData {
81    /// Create an empty column for the given type.
82    fn new(col_type: &ColumnType) -> Self {
83        match col_type {
84            ColumnType::Int64 => Self::Int64 {
85                values: Vec::new(),
86                valid: Vec::new(),
87            },
88            ColumnType::Float64 => Self::Float64 {
89                values: Vec::new(),
90                valid: Vec::new(),
91            },
92            ColumnType::Bool => Self::Bool {
93                values: Vec::new(),
94                valid: Vec::new(),
95            },
96            ColumnType::Timestamp => Self::Timestamp {
97                values: Vec::new(),
98                valid: Vec::new(),
99            },
100            ColumnType::Decimal => Self::Decimal {
101                values: Vec::new(),
102                valid: Vec::new(),
103            },
104            ColumnType::Uuid => Self::Uuid {
105                values: Vec::new(),
106                valid: Vec::new(),
107            },
108            ColumnType::String => Self::String {
109                data: Vec::new(),
110                offsets: vec![0],
111                valid: Vec::new(),
112            },
113            ColumnType::Bytes => Self::Bytes {
114                data: Vec::new(),
115                offsets: vec![0],
116                valid: Vec::new(),
117            },
118            ColumnType::Geometry => Self::Geometry {
119                data: Vec::new(),
120                offsets: vec![0],
121                valid: Vec::new(),
122            },
123            ColumnType::Vector(dim) => Self::Vector {
124                data: Vec::new(),
125                dim: *dim,
126                valid: Vec::new(),
127            },
128        }
129    }
130
131    /// Number of rows in this column.
132    pub(crate) fn len(&self) -> usize {
133        match self {
134            Self::Int64 { valid, .. }
135            | Self::Float64 { valid, .. }
136            | Self::Bool { valid, .. }
137            | Self::Timestamp { valid, .. }
138            | Self::Decimal { valid, .. }
139            | Self::Uuid { valid, .. }
140            | Self::String { valid, .. }
141            | Self::Bytes { valid, .. }
142            | Self::Geometry { valid, .. }
143            | Self::Vector { valid, .. } => valid.len(),
144        }
145    }
146
147    /// Append a value. Returns error if type doesn't match.
148    fn push(&mut self, value: &Value, col_name: &str) -> Result<(), ColumnarError> {
149        match (self, value) {
150            // Null for any column type.
151            (Self::Int64 { values, valid }, Value::Null) => {
152                values.push(0);
153                valid.push(false);
154            }
155            (Self::Float64 { values, valid }, Value::Null) => {
156                values.push(0.0);
157                valid.push(false);
158            }
159            (Self::Bool { values, valid }, Value::Null) => {
160                values.push(false);
161                valid.push(false);
162            }
163            (Self::Timestamp { values, valid }, Value::Null) => {
164                values.push(0);
165                valid.push(false);
166            }
167            (Self::Decimal { values, valid }, Value::Null) => {
168                values.push([0u8; 16]);
169                valid.push(false);
170            }
171            (Self::Uuid { values, valid }, Value::Null) => {
172                values.push([0u8; 16]);
173                valid.push(false);
174            }
175            (
176                Self::String {
177                    data: _,
178                    offsets,
179                    valid,
180                },
181                Value::Null,
182            ) => {
183                offsets.push(*offsets.last().unwrap_or(&0));
184                valid.push(false);
185            }
186            (
187                Self::Bytes {
188                    data: _,
189                    offsets,
190                    valid,
191                },
192                Value::Null,
193            ) => {
194                offsets.push(*offsets.last().unwrap_or(&0));
195                valid.push(false);
196            }
197            (
198                Self::Geometry {
199                    data: _,
200                    offsets,
201                    valid,
202                },
203                Value::Null,
204            ) => {
205                offsets.push(*offsets.last().unwrap_or(&0));
206                valid.push(false);
207            }
208            (Self::Vector { data, dim, valid }, Value::Null) => {
209                data.extend(std::iter::repeat_n(0.0f32, *dim as usize));
210                valid.push(false);
211            }
212
213            // Typed values.
214            (Self::Int64 { values, valid }, Value::Integer(v)) => {
215                values.push(*v);
216                valid.push(true);
217            }
218            (Self::Float64 { values, valid }, Value::Float(v)) => {
219                values.push(*v);
220                valid.push(true);
221            }
222            (Self::Float64 { values, valid }, Value::Integer(v)) => {
223                values.push(*v as f64);
224                valid.push(true);
225            }
226            (Self::Bool { values, valid }, Value::Bool(v)) => {
227                values.push(*v);
228                valid.push(true);
229            }
230            (Self::Timestamp { values, valid }, Value::DateTime(dt)) => {
231                values.push(dt.micros);
232                valid.push(true);
233            }
234            (Self::Timestamp { values, valid }, Value::Integer(micros)) => {
235                values.push(*micros);
236                valid.push(true);
237            }
238            (Self::Decimal { values, valid }, Value::Decimal(d)) => {
239                values.push(d.serialize());
240                valid.push(true);
241            }
242            (Self::Uuid { values, valid }, Value::Uuid(s)) => {
243                let bytes = uuid::Uuid::parse_str(s)
244                    .map(|u| *u.as_bytes())
245                    .unwrap_or([0u8; 16]);
246                values.push(bytes);
247                valid.push(true);
248            }
249            (
250                Self::String {
251                    data,
252                    offsets,
253                    valid,
254                },
255                Value::String(s),
256            ) => {
257                data.extend_from_slice(s.as_bytes());
258                offsets.push(data.len() as u32);
259                valid.push(true);
260            }
261            (
262                Self::Bytes {
263                    data,
264                    offsets,
265                    valid,
266                },
267                Value::Bytes(b),
268            ) => {
269                data.extend_from_slice(b);
270                offsets.push(data.len() as u32);
271                valid.push(true);
272            }
273            (
274                Self::Geometry {
275                    data,
276                    offsets,
277                    valid,
278                },
279                Value::Geometry(g),
280            ) => {
281                if let Ok(json) = serde_json::to_vec(g) {
282                    data.extend_from_slice(&json);
283                }
284                offsets.push(data.len() as u32);
285                valid.push(true);
286            }
287            (
288                Self::Geometry {
289                    data,
290                    offsets,
291                    valid,
292                },
293                Value::String(s),
294            ) => {
295                data.extend_from_slice(s.as_bytes());
296                offsets.push(data.len() as u32);
297                valid.push(true);
298            }
299            (Self::Vector { data, dim, valid }, Value::Array(arr)) => {
300                let d = *dim as usize;
301                for (i, v) in arr.iter().take(d).enumerate() {
302                    let f = match v {
303                        Value::Float(f) => *f as f32,
304                        Value::Integer(n) => *n as f32,
305                        _ => 0.0,
306                    };
307                    if i < d {
308                        data.push(f);
309                    }
310                }
311                // Pad with zeros if array is shorter than dim.
312                for _ in arr.len()..d {
313                    data.push(0.0);
314                }
315                valid.push(true);
316            }
317
318            (other, val) => {
319                let type_name = match other {
320                    Self::Int64 { .. } => "Int64",
321                    Self::Float64 { .. } => "Float64",
322                    Self::Bool { .. } => "Bool",
323                    Self::Timestamp { .. } => "Timestamp",
324                    Self::Decimal { .. } => "Decimal",
325                    Self::Uuid { .. } => "Uuid",
326                    Self::String { .. } => "String",
327                    Self::Bytes { .. } => "Bytes",
328                    Self::Geometry { .. } => "Geometry",
329                    Self::Vector { .. } => "Vector",
330                };
331                let _ = val; // Consumed by match.
332                return Err(ColumnarError::TypeMismatch {
333                    column: col_name.to_string(),
334                    expected: type_name.to_string(),
335                });
336            }
337        }
338        Ok(())
339    }
340}
341
342/// In-memory columnar buffer that accumulates INSERTs.
343///
344/// Each column is stored as a typed vector. The memtable flushes to a
345/// compressed segment when the row count reaches the threshold.
346pub struct ColumnarMemtable {
347    schema: ColumnarSchema,
348    columns: Vec<ColumnData>,
349    row_count: usize,
350    flush_threshold: usize,
351}
352
353impl ColumnarMemtable {
354    /// Create a new empty memtable for the given schema.
355    pub fn new(schema: &ColumnarSchema) -> Self {
356        Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
357    }
358
359    /// Create with a custom flush threshold.
360    pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
361        let columns = schema
362            .columns
363            .iter()
364            .map(|col| ColumnData::new(&col.column_type))
365            .collect();
366        Self {
367            schema: schema.clone(),
368            columns,
369            row_count: 0,
370            flush_threshold,
371        }
372    }
373
374    /// Append a row of values. Validates types and nullability.
375    pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
376        if values.len() != self.schema.columns.len() {
377            return Err(ColumnarError::SchemaMismatch {
378                expected: self.schema.columns.len(),
379                got: values.len(),
380            });
381        }
382
383        for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
384            if matches!(value, Value::Null) && !col_def.nullable {
385                return Err(ColumnarError::NullViolation(col_def.name.clone()));
386            }
387            self.columns[i].push(value, &col_def.name)?;
388        }
389
390        self.row_count += 1;
391        debug_assert!(
392            self.columns.iter().all(|c| c.len() == self.row_count),
393            "column lengths must stay aligned with row_count"
394        );
395        Ok(())
396    }
397
398    /// Number of rows currently buffered.
399    pub fn row_count(&self) -> usize {
400        self.row_count
401    }
402
403    /// Whether the memtable has reached its flush threshold.
404    pub fn should_flush(&self) -> bool {
405        self.row_count >= self.flush_threshold
406    }
407
408    /// Whether the memtable is empty.
409    pub fn is_empty(&self) -> bool {
410        self.row_count == 0
411    }
412
413    /// Access the schema.
414    pub fn schema(&self) -> &ColumnarSchema {
415        &self.schema
416    }
417
418    /// Access the raw column data (for the segment writer).
419    pub fn columns(&self) -> &[ColumnData] {
420        &self.columns
421    }
422
423    /// Drain the memtable: return all column data and reset to empty.
424    pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
425        let columns = std::mem::replace(
426            &mut self.columns,
427            self.schema
428                .columns
429                .iter()
430                .map(|col| ColumnData::new(&col.column_type))
431                .collect(),
432        );
433        let row_count = self.row_count;
434        self.row_count = 0;
435        (self.schema.clone(), columns, row_count)
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
442
443    use super::*;
444
445    fn test_schema() -> ColumnarSchema {
446        ColumnarSchema::new(vec![
447            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
448            ColumnDef::required("name", ColumnType::String),
449            ColumnDef::nullable("score", ColumnType::Float64),
450        ])
451        .expect("valid schema")
452    }
453
454    #[test]
455    fn append_and_count() {
456        let schema = test_schema();
457        let mut mt = ColumnarMemtable::new(&schema);
458
459        mt.append_row(&[
460            Value::Integer(1),
461            Value::String("Alice".into()),
462            Value::Float(0.75),
463        ])
464        .expect("append");
465
466        mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
467            .expect("append");
468
469        assert_eq!(mt.row_count(), 2);
470        assert!(!mt.is_empty());
471    }
472
473    #[test]
474    fn null_violation_rejected() {
475        let schema = test_schema();
476        let mut mt = ColumnarMemtable::new(&schema);
477
478        let err = mt
479            .append_row(&[
480                Value::Null, // id is NOT NULL
481                Value::String("x".into()),
482                Value::Null,
483            ])
484            .unwrap_err();
485        assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
486    }
487
488    #[test]
489    fn schema_mismatch_rejected() {
490        let schema = test_schema();
491        let mut mt = ColumnarMemtable::new(&schema);
492
493        let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
494        assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
495    }
496
497    #[test]
498    fn flush_threshold() {
499        let schema = test_schema();
500        let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
501
502        for i in 0..2 {
503            mt.append_row(&[
504                Value::Integer(i),
505                Value::String(format!("u{i}")),
506                Value::Null,
507            ])
508            .expect("append");
509        }
510        assert!(!mt.should_flush());
511
512        mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
513            .expect("append");
514        assert!(mt.should_flush());
515    }
516
517    #[test]
518    fn drain_resets() {
519        let schema = test_schema();
520        let mut mt = ColumnarMemtable::new(&schema);
521
522        mt.append_row(&[
523            Value::Integer(1),
524            Value::String("x".into()),
525            Value::Float(0.5),
526        ])
527        .expect("append");
528
529        let (_schema, columns, row_count) = mt.drain();
530        assert_eq!(row_count, 1);
531        assert_eq!(columns.len(), 3);
532        assert_eq!(mt.row_count(), 0);
533        assert!(mt.is_empty());
534
535        // Verify column data.
536        match &columns[0] {
537            ColumnData::Int64 { values, valid } => {
538                assert_eq!(values, &[1]);
539                assert_eq!(valid, &[true]);
540            }
541            _ => panic!("expected Int64"),
542        }
543        match &columns[1] {
544            ColumnData::String {
545                data,
546                offsets,
547                valid,
548            } => {
549                assert_eq!(std::str::from_utf8(data).unwrap(), "x");
550                assert_eq!(offsets, &[0, 1]);
551                assert_eq!(valid, &[true]);
552            }
553            _ => panic!("expected String"),
554        }
555    }
556
557    #[test]
558    fn all_types() {
559        let schema = ColumnarSchema::new(vec![
560            ColumnDef::required("i", ColumnType::Int64),
561            ColumnDef::required("f", ColumnType::Float64),
562            ColumnDef::required("b", ColumnType::Bool),
563            ColumnDef::required("ts", ColumnType::Timestamp),
564            ColumnDef::required("s", ColumnType::String),
565            ColumnDef::required("raw", ColumnType::Bytes),
566            ColumnDef::required("vec", ColumnType::Vector(3)),
567        ])
568        .expect("valid");
569
570        let mut mt = ColumnarMemtable::new(&schema);
571        mt.append_row(&[
572            Value::Integer(42),
573            Value::Float(0.25),
574            Value::Bool(true),
575            Value::Integer(1_700_000_000), // timestamp as micros
576            Value::String("hello".into()),
577            Value::Bytes(vec![0xDE, 0xAD]),
578            Value::Array(vec![
579                Value::Float(1.0),
580                Value::Float(2.0),
581                Value::Float(3.0),
582            ]),
583        ])
584        .expect("append all types");
585
586        assert_eq!(mt.row_count(), 1);
587    }
588}