1mod column_data;
14
15pub use column_data::{ColumnData, DICT_ENCODE_MAX_CARDINALITY};
16
17use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
18use nodedb_types::value::Value;
19
20use crate::error::ColumnarError;
21
22pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
24
25pub struct ColumnarMemtable {
30 schema: ColumnarSchema,
31 columns: Vec<ColumnData>,
32 row_count: usize,
33 flush_threshold: usize,
34}
35
36impl ColumnarMemtable {
37 pub fn new(schema: &ColumnarSchema) -> Self {
39 Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
40 }
41
42 pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
44 let columns = schema
45 .columns
46 .iter()
47 .map(|col| ColumnData::new(&col.column_type, col.nullable))
48 .collect();
49 Self {
50 schema: schema.clone(),
51 columns,
52 row_count: 0,
53 flush_threshold,
54 }
55 }
56
57 pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
59 if values.len() != self.schema.columns.len() {
60 return Err(ColumnarError::SchemaMismatch {
61 expected: self.schema.columns.len(),
62 got: values.len(),
63 });
64 }
65
66 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
67 if matches!(value, Value::Null) && !col_def.nullable {
68 return Err(ColumnarError::NullViolation(col_def.name.clone()));
69 }
70 self.columns[i].push(value, &col_def.name)?;
71 }
72
73 self.row_count += 1;
74 debug_assert!(
75 self.columns.iter().all(|c| c.len() == self.row_count),
76 "column lengths must stay aligned with row_count"
77 );
78 Ok(())
79 }
80
81 pub fn row_count(&self) -> usize {
83 self.row_count
84 }
85
86 pub fn should_flush(&self) -> bool {
88 self.row_count >= self.flush_threshold
89 }
90
91 pub fn is_empty(&self) -> bool {
93 self.row_count == 0
94 }
95
96 pub fn schema(&self) -> &ColumnarSchema {
98 &self.schema
99 }
100
101 pub fn columns(&self) -> &[ColumnData] {
103 &self.columns
104 }
105
106 pub fn try_dict_encode_columns(&mut self, max_cardinality: u32) {
108 for col in &mut self.columns {
109 if let ColumnData::String { .. } = col
110 && let Some(encoded) = ColumnData::try_dict_encode(col, max_cardinality)
111 {
112 *col = encoded;
113 }
114 }
115 }
116
117 pub fn iter_rows(&self) -> MemtableRowIter<'_> {
119 MemtableRowIter {
120 columns: &self.columns,
121 row_count: self.row_count,
122 current: 0,
123 }
124 }
125
126 pub fn get_row(&self, row_idx: usize) -> Option<Vec<Value>> {
128 if row_idx >= self.row_count {
129 return None;
130 }
131 let mut row = Vec::with_capacity(self.columns.len());
133 for col in &self.columns {
134 row.push(col.get_value(row_idx));
135 }
136 Some(row)
137 }
138
139 pub fn truncate_to(&mut self, n: usize) {
144 if n >= self.row_count {
145 return;
146 }
147 for col in &mut self.columns {
148 col.truncate(n);
149 }
150 self.row_count = n;
151 debug_assert!(
152 self.columns.iter().all(|c| c.len() == self.row_count),
153 "column lengths misaligned after truncate_to"
154 );
155 }
156
157 pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
159 let columns = std::mem::replace(
160 &mut self.columns,
161 self.schema
162 .columns
163 .iter()
164 .map(|col| ColumnData::new(&col.column_type, col.nullable))
165 .collect(),
166 );
167 let row_count = self.row_count;
168 self.row_count = 0;
169 (self.schema.clone(), columns, row_count)
170 }
171
172 pub fn drain_optimized(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
174 self.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
175 self.drain()
176 }
177
178 pub fn ingest_row_refs(&mut self, values: &[IngestValue<'_>]) -> Result<(), ColumnarError> {
183 if values.len() != self.schema.columns.len() {
184 return Err(ColumnarError::SchemaMismatch {
185 expected: self.schema.columns.len(),
186 got: values.len(),
187 });
188 }
189
190 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
191 if matches!(value, IngestValue::Null) && !col_def.nullable {
192 return Err(ColumnarError::NullViolation(col_def.name.clone()));
193 }
194 self.columns[i].push_ref(value, &col_def.name)?;
195 }
196
197 self.row_count += 1;
198 Ok(())
199 }
200
201 pub fn add_column(&mut self, name: String, column_type: ColumnType, nullable: bool) {
203 if self.schema.columns.iter().any(|c| c.name == name) {
204 return;
205 }
206
207 let existing_rows = self.row_count;
208 let mut col = ColumnData::new(&column_type, nullable);
209 if existing_rows > 0 {
210 col.backfill_nulls(existing_rows);
211 }
212
213 self.columns.push(col);
214 let col_def = if nullable {
215 ColumnDef::nullable(name, column_type)
216 } else {
217 ColumnDef::required(name, column_type)
218 };
219 self.schema.columns.push(col_def);
220 }
221}
222
223#[derive(Debug, Clone, Copy)]
225#[non_exhaustive]
226pub enum IngestValue<'a> {
227 Null,
228 Int64(i64),
229 Float64(f64),
230 Bool(bool),
231 Timestamp(i64),
232 Str(&'a str),
234}
235
236pub struct MemtableRowIter<'a> {
238 columns: &'a [ColumnData],
239 row_count: usize,
240 current: usize,
241}
242
243impl Iterator for MemtableRowIter<'_> {
244 type Item = Vec<Value>;
245
246 fn next(&mut self) -> Option<Self::Item> {
247 if self.current >= self.row_count {
248 return None;
249 }
250 let mut row = Vec::with_capacity(self.columns.len());
252 for col in self.columns {
253 row.push(col.get_value(self.current));
254 }
255 self.current += 1;
256 Some(row)
257 }
258
259 fn size_hint(&self) -> (usize, Option<usize>) {
260 let remaining = self.row_count - self.current;
261 (remaining, Some(remaining))
262 }
263}
264
265impl ExactSizeIterator for MemtableRowIter<'_> {}
266
267#[cfg(test)]
268mod tests {
269 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
270
271 use super::*;
272
273 fn test_schema() -> ColumnarSchema {
274 ColumnarSchema::new(vec![
275 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
276 ColumnDef::required("name", ColumnType::String),
277 ColumnDef::nullable("score", ColumnType::Float64),
278 ])
279 .expect("valid schema")
280 }
281
282 #[test]
283 fn append_and_count() {
284 let schema = test_schema();
285 let mut mt = ColumnarMemtable::new(&schema);
286
287 mt.append_row(&[
288 Value::Integer(1),
289 Value::String("Alice".into()),
290 Value::Float(0.75),
291 ])
292 .expect("append");
293
294 mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
295 .expect("append");
296
297 assert_eq!(mt.row_count(), 2);
298 assert!(!mt.is_empty());
299 }
300
301 #[test]
302 fn null_violation_rejected() {
303 let schema = test_schema();
304 let mut mt = ColumnarMemtable::new(&schema);
305
306 let err = mt
307 .append_row(&[Value::Null, Value::String("x".into()), Value::Null])
308 .unwrap_err();
309 assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
310 }
311
312 #[test]
313 fn schema_mismatch_rejected() {
314 let schema = test_schema();
315 let mut mt = ColumnarMemtable::new(&schema);
316
317 let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
318 assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
319 }
320
321 #[test]
322 fn flush_threshold() {
323 let schema = test_schema();
324 let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
325
326 for i in 0..2 {
327 mt.append_row(&[
328 Value::Integer(i),
329 Value::String(format!("u{i}")),
330 Value::Null,
331 ])
332 .expect("append");
333 }
334 assert!(!mt.should_flush());
335
336 mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
337 .expect("append");
338 assert!(mt.should_flush());
339 }
340
341 #[test]
342 fn drain_resets() {
343 let schema = test_schema();
344 let mut mt = ColumnarMemtable::new(&schema);
345
346 mt.append_row(&[
347 Value::Integer(1),
348 Value::String("x".into()),
349 Value::Float(0.5),
350 ])
351 .expect("append");
352
353 let (_schema, columns, row_count) = mt.drain();
354 assert_eq!(row_count, 1);
355 assert_eq!(columns.len(), 3);
356 assert_eq!(mt.row_count(), 0);
357 assert!(mt.is_empty());
358
359 match &columns[0] {
360 ColumnData::Int64 { values, valid } => {
361 assert_eq!(values, &[1]);
362 assert!(valid.is_none());
363 }
364 _ => panic!("expected Int64"),
365 }
366 match &columns[1] {
367 ColumnData::String {
368 data,
369 offsets,
370 valid,
371 } => {
372 assert_eq!(std::str::from_utf8(data).unwrap(), "x");
373 assert_eq!(offsets, &[0, 1]);
374 assert!(valid.is_none());
375 }
376 _ => panic!("expected String"),
377 }
378 }
379
380 #[test]
381 fn all_types() {
382 let schema = ColumnarSchema::new(vec![
383 ColumnDef::required("i", ColumnType::Int64),
384 ColumnDef::required("f", ColumnType::Float64),
385 ColumnDef::required("b", ColumnType::Bool),
386 ColumnDef::required("ts", ColumnType::Timestamp),
387 ColumnDef::required("s", ColumnType::String),
388 ColumnDef::required("raw", ColumnType::Bytes),
389 ColumnDef::required("vec", ColumnType::Vector(3)),
390 ])
391 .expect("valid");
392
393 let mut mt = ColumnarMemtable::new(&schema);
394 mt.append_row(&[
395 Value::Integer(42),
396 Value::Float(0.25),
397 Value::Bool(true),
398 Value::Integer(1_700_000_000),
399 Value::String("hello".into()),
400 Value::Bytes(vec![0xDE, 0xAD]),
401 Value::Array(vec![
402 Value::Float(1.0),
403 Value::Float(2.0),
404 Value::Float(3.0),
405 ]),
406 ])
407 .expect("append all types");
408
409 assert_eq!(mt.row_count(), 1);
410 }
411
412 #[test]
413 fn dict_encode_low_cardinality() {
414 let schema = ColumnarSchema::new(vec![ColumnDef::required("qtype", ColumnType::String)])
415 .expect("valid");
416
417 let mut mt = ColumnarMemtable::new(&schema);
418 let qtypes = ["A", "B", "AAAA", "NS", "MX", "SOA", "CNAME", "PTR"];
419 for _ in 0..10 {
420 for &q in &qtypes {
421 mt.append_row(&[Value::String(q.into())]).expect("append");
422 }
423 }
424 assert_eq!(mt.row_count(), 80);
425
426 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
427
428 let (_schema, columns, _row_count) = mt.drain();
429 match &columns[0] {
430 ColumnData::DictEncoded {
431 ids,
432 dictionary,
433 valid,
434 ..
435 } => {
436 assert_eq!(ids.len(), 80);
437 assert!(valid.is_none());
438 assert_eq!(dictionary.len(), 8);
439 for &id in ids {
440 assert!((id as usize) < dictionary.len());
441 }
442 for (i, &q) in qtypes.iter().enumerate().take(8) {
443 let expected_id = dictionary.iter().position(|s| s == q).expect("in dict");
444 assert_eq!(ids[i], expected_id as u32);
445 }
446 }
447 _ => panic!("expected DictEncoded after try_dict_encode_columns"),
448 }
449 }
450
451 #[test]
452 fn dict_encode_exceeds_cardinality_stays_string() {
453 let schema = ColumnarSchema::new(vec![ColumnDef::required("name", ColumnType::String)])
454 .expect("valid");
455
456 let mut mt = ColumnarMemtable::new(&schema);
457 let max: u32 = 4;
458 for i in 0..=max {
459 mt.append_row(&[Value::String(format!("val_{i}"))])
460 .expect("append");
461 }
462
463 mt.try_dict_encode_columns(max);
464
465 let (_schema, columns, _row_count) = mt.drain();
466 assert!(matches!(columns[0], ColumnData::String { .. }));
467 }
468
469 #[test]
470 fn dict_encode_with_nulls() {
471 let schema = ColumnarSchema::new(vec![ColumnDef::nullable("tag", ColumnType::String)])
472 .expect("valid");
473
474 let mut mt = ColumnarMemtable::new(&schema);
475 mt.append_row(&[Value::String("foo".into())])
476 .expect("append");
477 mt.append_row(&[Value::Null]).expect("append null");
478 mt.append_row(&[Value::String("bar".into())])
479 .expect("append");
480 mt.append_row(&[Value::Null]).expect("append null");
481
482 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
483
484 let (_schema, columns, _row_count) = mt.drain();
485 match &columns[0] {
486 ColumnData::DictEncoded {
487 ids,
488 valid,
489 dictionary,
490 ..
491 } => {
492 assert_eq!(ids.len(), 4);
493 let v = valid.as_ref().expect("nullable column has validity bitmap");
494 assert_eq!(v.len(), 4);
495 assert!(v[0]);
496 assert!(!v[1]);
497 assert!(v[2]);
498 assert!(!v[3]);
499 assert_eq!(dictionary.len(), 2);
500 }
501 _ => panic!("expected DictEncoded"),
502 }
503 }
504}