1mod column_data;
12
13pub use column_data::{ColumnData, DICT_ENCODE_MAX_CARDINALITY};
14
15use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
16use nodedb_types::value::Value;
17
18use crate::error::ColumnarError;
19
20pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
22
23pub struct ColumnarMemtable {
28 schema: ColumnarSchema,
29 columns: Vec<ColumnData>,
30 row_count: usize,
31 flush_threshold: usize,
32}
33
34impl ColumnarMemtable {
35 pub fn new(schema: &ColumnarSchema) -> Self {
37 Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
38 }
39
40 pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
42 let columns = schema
43 .columns
44 .iter()
45 .map(|col| ColumnData::new(&col.column_type, col.nullable))
46 .collect();
47 Self {
48 schema: schema.clone(),
49 columns,
50 row_count: 0,
51 flush_threshold,
52 }
53 }
54
55 pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
57 if values.len() != self.schema.columns.len() {
58 return Err(ColumnarError::SchemaMismatch {
59 expected: self.schema.columns.len(),
60 got: values.len(),
61 });
62 }
63
64 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
65 if matches!(value, Value::Null) && !col_def.nullable {
66 return Err(ColumnarError::NullViolation(col_def.name.clone()));
67 }
68 self.columns[i].push(value, &col_def.name)?;
69 }
70
71 self.row_count += 1;
72 debug_assert!(
73 self.columns.iter().all(|c| c.len() == self.row_count),
74 "column lengths must stay aligned with row_count"
75 );
76 Ok(())
77 }
78
79 pub fn row_count(&self) -> usize {
81 self.row_count
82 }
83
84 pub fn should_flush(&self) -> bool {
86 self.row_count >= self.flush_threshold
87 }
88
89 pub fn is_empty(&self) -> bool {
91 self.row_count == 0
92 }
93
94 pub fn schema(&self) -> &ColumnarSchema {
96 &self.schema
97 }
98
99 pub fn columns(&self) -> &[ColumnData] {
101 &self.columns
102 }
103
104 pub fn try_dict_encode_columns(&mut self, max_cardinality: u32) {
106 for col in &mut self.columns {
107 if let ColumnData::String { .. } = col
108 && let Some(encoded) = ColumnData::try_dict_encode(col, max_cardinality)
109 {
110 *col = encoded;
111 }
112 }
113 }
114
115 pub fn iter_rows(&self) -> MemtableRowIter<'_> {
117 MemtableRowIter {
118 columns: &self.columns,
119 row_count: self.row_count,
120 current: 0,
121 }
122 }
123
124 pub fn get_row(&self, row_idx: usize) -> Option<Vec<Value>> {
126 if row_idx >= self.row_count {
127 return None;
128 }
129 let mut row = Vec::with_capacity(self.columns.len());
130 for col in &self.columns {
131 row.push(col.get_value(row_idx));
132 }
133 Some(row)
134 }
135
136 pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
138 let columns = std::mem::replace(
139 &mut self.columns,
140 self.schema
141 .columns
142 .iter()
143 .map(|col| ColumnData::new(&col.column_type, col.nullable))
144 .collect(),
145 );
146 let row_count = self.row_count;
147 self.row_count = 0;
148 (self.schema.clone(), columns, row_count)
149 }
150
151 pub fn drain_optimized(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
153 self.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
154 self.drain()
155 }
156
157 pub fn ingest_row_refs(&mut self, values: &[IngestValue<'_>]) -> Result<(), ColumnarError> {
162 if values.len() != self.schema.columns.len() {
163 return Err(ColumnarError::SchemaMismatch {
164 expected: self.schema.columns.len(),
165 got: values.len(),
166 });
167 }
168
169 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
170 if matches!(value, IngestValue::Null) && !col_def.nullable {
171 return Err(ColumnarError::NullViolation(col_def.name.clone()));
172 }
173 self.columns[i].push_ref(value, &col_def.name)?;
174 }
175
176 self.row_count += 1;
177 Ok(())
178 }
179
180 pub fn add_column(&mut self, name: String, column_type: ColumnType, nullable: bool) {
182 if self.schema.columns.iter().any(|c| c.name == name) {
183 return;
184 }
185
186 let existing_rows = self.row_count;
187 let mut col = ColumnData::new(&column_type, nullable);
188 if existing_rows > 0 {
189 col.backfill_nulls(existing_rows);
190 }
191
192 self.columns.push(col);
193 self.schema.columns.push(ColumnDef {
194 name,
195 column_type,
196 nullable,
197 default: None,
198 primary_key: false,
199 modifiers: Vec::new(),
200 generated_expr: None,
201 generated_deps: Vec::new(),
202 });
203 }
204}
205
206#[derive(Debug, Clone, Copy)]
208pub enum IngestValue<'a> {
209 Null,
210 Int64(i64),
211 Float64(f64),
212 Bool(bool),
213 Timestamp(i64),
214 Str(&'a str),
216}
217
218pub struct MemtableRowIter<'a> {
220 columns: &'a [ColumnData],
221 row_count: usize,
222 current: usize,
223}
224
225impl Iterator for MemtableRowIter<'_> {
226 type Item = Vec<Value>;
227
228 fn next(&mut self) -> Option<Self::Item> {
229 if self.current >= self.row_count {
230 return None;
231 }
232 let mut row = Vec::with_capacity(self.columns.len());
233 for col in self.columns {
234 row.push(col.get_value(self.current));
235 }
236 self.current += 1;
237 Some(row)
238 }
239
240 fn size_hint(&self) -> (usize, Option<usize>) {
241 let remaining = self.row_count - self.current;
242 (remaining, Some(remaining))
243 }
244}
245
246impl ExactSizeIterator for MemtableRowIter<'_> {}
247
248#[cfg(test)]
249mod tests {
250 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
251
252 use super::*;
253
254 fn test_schema() -> ColumnarSchema {
255 ColumnarSchema::new(vec![
256 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
257 ColumnDef::required("name", ColumnType::String),
258 ColumnDef::nullable("score", ColumnType::Float64),
259 ])
260 .expect("valid schema")
261 }
262
263 #[test]
264 fn append_and_count() {
265 let schema = test_schema();
266 let mut mt = ColumnarMemtable::new(&schema);
267
268 mt.append_row(&[
269 Value::Integer(1),
270 Value::String("Alice".into()),
271 Value::Float(0.75),
272 ])
273 .expect("append");
274
275 mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
276 .expect("append");
277
278 assert_eq!(mt.row_count(), 2);
279 assert!(!mt.is_empty());
280 }
281
282 #[test]
283 fn null_violation_rejected() {
284 let schema = test_schema();
285 let mut mt = ColumnarMemtable::new(&schema);
286
287 let err = mt
288 .append_row(&[Value::Null, Value::String("x".into()), Value::Null])
289 .unwrap_err();
290 assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
291 }
292
293 #[test]
294 fn schema_mismatch_rejected() {
295 let schema = test_schema();
296 let mut mt = ColumnarMemtable::new(&schema);
297
298 let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
299 assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
300 }
301
302 #[test]
303 fn flush_threshold() {
304 let schema = test_schema();
305 let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
306
307 for i in 0..2 {
308 mt.append_row(&[
309 Value::Integer(i),
310 Value::String(format!("u{i}")),
311 Value::Null,
312 ])
313 .expect("append");
314 }
315 assert!(!mt.should_flush());
316
317 mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
318 .expect("append");
319 assert!(mt.should_flush());
320 }
321
322 #[test]
323 fn drain_resets() {
324 let schema = test_schema();
325 let mut mt = ColumnarMemtable::new(&schema);
326
327 mt.append_row(&[
328 Value::Integer(1),
329 Value::String("x".into()),
330 Value::Float(0.5),
331 ])
332 .expect("append");
333
334 let (_schema, columns, row_count) = mt.drain();
335 assert_eq!(row_count, 1);
336 assert_eq!(columns.len(), 3);
337 assert_eq!(mt.row_count(), 0);
338 assert!(mt.is_empty());
339
340 match &columns[0] {
341 ColumnData::Int64 { values, valid } => {
342 assert_eq!(values, &[1]);
343 assert!(valid.is_none());
344 }
345 _ => panic!("expected Int64"),
346 }
347 match &columns[1] {
348 ColumnData::String {
349 data,
350 offsets,
351 valid,
352 } => {
353 assert_eq!(std::str::from_utf8(data).unwrap(), "x");
354 assert_eq!(offsets, &[0, 1]);
355 assert!(valid.is_none());
356 }
357 _ => panic!("expected String"),
358 }
359 }
360
361 #[test]
362 fn all_types() {
363 let schema = ColumnarSchema::new(vec![
364 ColumnDef::required("i", ColumnType::Int64),
365 ColumnDef::required("f", ColumnType::Float64),
366 ColumnDef::required("b", ColumnType::Bool),
367 ColumnDef::required("ts", ColumnType::Timestamp),
368 ColumnDef::required("s", ColumnType::String),
369 ColumnDef::required("raw", ColumnType::Bytes),
370 ColumnDef::required("vec", ColumnType::Vector(3)),
371 ])
372 .expect("valid");
373
374 let mut mt = ColumnarMemtable::new(&schema);
375 mt.append_row(&[
376 Value::Integer(42),
377 Value::Float(0.25),
378 Value::Bool(true),
379 Value::Integer(1_700_000_000),
380 Value::String("hello".into()),
381 Value::Bytes(vec![0xDE, 0xAD]),
382 Value::Array(vec![
383 Value::Float(1.0),
384 Value::Float(2.0),
385 Value::Float(3.0),
386 ]),
387 ])
388 .expect("append all types");
389
390 assert_eq!(mt.row_count(), 1);
391 }
392
393 #[test]
394 fn dict_encode_low_cardinality() {
395 let schema = ColumnarSchema::new(vec![ColumnDef::required("qtype", ColumnType::String)])
396 .expect("valid");
397
398 let mut mt = ColumnarMemtable::new(&schema);
399 let qtypes = ["A", "B", "AAAA", "NS", "MX", "SOA", "CNAME", "PTR"];
400 for _ in 0..10 {
401 for &q in &qtypes {
402 mt.append_row(&[Value::String(q.into())]).expect("append");
403 }
404 }
405 assert_eq!(mt.row_count(), 80);
406
407 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
408
409 let (_schema, columns, _row_count) = mt.drain();
410 match &columns[0] {
411 ColumnData::DictEncoded {
412 ids,
413 dictionary,
414 valid,
415 ..
416 } => {
417 assert_eq!(ids.len(), 80);
418 assert!(valid.is_none());
419 assert_eq!(dictionary.len(), 8);
420 for &id in ids {
421 assert!((id as usize) < dictionary.len());
422 }
423 for (i, &q) in qtypes.iter().enumerate().take(8) {
424 let expected_id = dictionary.iter().position(|s| s == q).expect("in dict");
425 assert_eq!(ids[i], expected_id as u32);
426 }
427 }
428 _ => panic!("expected DictEncoded after try_dict_encode_columns"),
429 }
430 }
431
432 #[test]
433 fn dict_encode_exceeds_cardinality_stays_string() {
434 let schema = ColumnarSchema::new(vec![ColumnDef::required("name", ColumnType::String)])
435 .expect("valid");
436
437 let mut mt = ColumnarMemtable::new(&schema);
438 let max: u32 = 4;
439 for i in 0..=max {
440 mt.append_row(&[Value::String(format!("val_{i}"))])
441 .expect("append");
442 }
443
444 mt.try_dict_encode_columns(max);
445
446 let (_schema, columns, _row_count) = mt.drain();
447 assert!(matches!(columns[0], ColumnData::String { .. }));
448 }
449
450 #[test]
451 fn dict_encode_with_nulls() {
452 let schema = ColumnarSchema::new(vec![ColumnDef::nullable("tag", ColumnType::String)])
453 .expect("valid");
454
455 let mut mt = ColumnarMemtable::new(&schema);
456 mt.append_row(&[Value::String("foo".into())])
457 .expect("append");
458 mt.append_row(&[Value::Null]).expect("append null");
459 mt.append_row(&[Value::String("bar".into())])
460 .expect("append");
461 mt.append_row(&[Value::Null]).expect("append null");
462
463 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
464
465 let (_schema, columns, _row_count) = mt.drain();
466 match &columns[0] {
467 ColumnData::DictEncoded {
468 ids,
469 valid,
470 dictionary,
471 ..
472 } => {
473 assert_eq!(ids.len(), 4);
474 let v = valid.as_ref().expect("nullable column has validity bitmap");
475 assert_eq!(v.len(), 4);
476 assert!(v[0]);
477 assert!(!v[1]);
478 assert!(v[2]);
479 assert!(!v[3]);
480 assert_eq!(dictionary.len(), 2);
481 }
482 _ => panic!("expected DictEncoded"),
483 }
484 }
485}