1mod column_data;
12
13pub use column_data::{ColumnData, DICT_ENCODE_MAX_CARDINALITY};
14
15use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
16use nodedb_types::value::Value;
17
18use crate::error::ColumnarError;
19
20pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
22
23pub struct ColumnarMemtable {
28 schema: ColumnarSchema,
29 columns: Vec<ColumnData>,
30 row_count: usize,
31 flush_threshold: usize,
32}
33
34impl ColumnarMemtable {
35 pub fn new(schema: &ColumnarSchema) -> Self {
37 Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
38 }
39
40 pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
42 let columns = schema
43 .columns
44 .iter()
45 .map(|col| ColumnData::new(&col.column_type, col.nullable))
46 .collect();
47 Self {
48 schema: schema.clone(),
49 columns,
50 row_count: 0,
51 flush_threshold,
52 }
53 }
54
55 pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
57 if values.len() != self.schema.columns.len() {
58 return Err(ColumnarError::SchemaMismatch {
59 expected: self.schema.columns.len(),
60 got: values.len(),
61 });
62 }
63
64 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
65 if matches!(value, Value::Null) && !col_def.nullable {
66 return Err(ColumnarError::NullViolation(col_def.name.clone()));
67 }
68 self.columns[i].push(value, &col_def.name)?;
69 }
70
71 self.row_count += 1;
72 debug_assert!(
73 self.columns.iter().all(|c| c.len() == self.row_count),
74 "column lengths must stay aligned with row_count"
75 );
76 Ok(())
77 }
78
79 pub fn row_count(&self) -> usize {
81 self.row_count
82 }
83
84 pub fn should_flush(&self) -> bool {
86 self.row_count >= self.flush_threshold
87 }
88
89 pub fn is_empty(&self) -> bool {
91 self.row_count == 0
92 }
93
94 pub fn schema(&self) -> &ColumnarSchema {
96 &self.schema
97 }
98
99 pub fn columns(&self) -> &[ColumnData] {
101 &self.columns
102 }
103
104 pub fn try_dict_encode_columns(&mut self, max_cardinality: u32) {
106 for col in &mut self.columns {
107 if let ColumnData::String { .. } = col
108 && let Some(encoded) = ColumnData::try_dict_encode(col, max_cardinality)
109 {
110 *col = encoded;
111 }
112 }
113 }
114
115 pub fn iter_rows(&self) -> MemtableRowIter<'_> {
117 MemtableRowIter {
118 columns: &self.columns,
119 row_count: self.row_count,
120 current: 0,
121 }
122 }
123
124 pub fn get_row(&self, row_idx: usize) -> Option<Vec<Value>> {
126 if row_idx >= self.row_count {
127 return None;
128 }
129 let mut row = Vec::with_capacity(self.columns.len());
130 for col in &self.columns {
131 row.push(col.get_value(row_idx));
132 }
133 Some(row)
134 }
135
136 pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
138 let columns = std::mem::replace(
139 &mut self.columns,
140 self.schema
141 .columns
142 .iter()
143 .map(|col| ColumnData::new(&col.column_type, col.nullable))
144 .collect(),
145 );
146 let row_count = self.row_count;
147 self.row_count = 0;
148 (self.schema.clone(), columns, row_count)
149 }
150
151 pub fn drain_optimized(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
153 self.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
154 self.drain()
155 }
156
157 pub fn ingest_row_refs(&mut self, values: &[IngestValue<'_>]) -> Result<(), ColumnarError> {
162 if values.len() != self.schema.columns.len() {
163 return Err(ColumnarError::SchemaMismatch {
164 expected: self.schema.columns.len(),
165 got: values.len(),
166 });
167 }
168
169 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
170 if matches!(value, IngestValue::Null) && !col_def.nullable {
171 return Err(ColumnarError::NullViolation(col_def.name.clone()));
172 }
173 self.columns[i].push_ref(value, &col_def.name)?;
174 }
175
176 self.row_count += 1;
177 Ok(())
178 }
179
180 pub fn add_column(&mut self, name: String, column_type: ColumnType, nullable: bool) {
182 if self.schema.columns.iter().any(|c| c.name == name) {
183 return;
184 }
185
186 let existing_rows = self.row_count;
187 let mut col = ColumnData::new(&column_type, nullable);
188 if existing_rows > 0 {
189 col.backfill_nulls(existing_rows);
190 }
191
192 self.columns.push(col);
193 self.schema.columns.push(ColumnDef {
194 name,
195 column_type,
196 nullable,
197 default: None,
198 primary_key: false,
199 modifiers: Vec::new(),
200 generated_expr: None,
201 generated_deps: Vec::new(),
202 added_at_version: 1,
203 });
204 }
205}
206
207#[derive(Debug, Clone, Copy)]
209pub enum IngestValue<'a> {
210 Null,
211 Int64(i64),
212 Float64(f64),
213 Bool(bool),
214 Timestamp(i64),
215 Str(&'a str),
217}
218
219pub struct MemtableRowIter<'a> {
221 columns: &'a [ColumnData],
222 row_count: usize,
223 current: usize,
224}
225
226impl Iterator for MemtableRowIter<'_> {
227 type Item = Vec<Value>;
228
229 fn next(&mut self) -> Option<Self::Item> {
230 if self.current >= self.row_count {
231 return None;
232 }
233 let mut row = Vec::with_capacity(self.columns.len());
234 for col in self.columns {
235 row.push(col.get_value(self.current));
236 }
237 self.current += 1;
238 Some(row)
239 }
240
241 fn size_hint(&self) -> (usize, Option<usize>) {
242 let remaining = self.row_count - self.current;
243 (remaining, Some(remaining))
244 }
245}
246
247impl ExactSizeIterator for MemtableRowIter<'_> {}
248
249#[cfg(test)]
250mod tests {
251 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
252
253 use super::*;
254
255 fn test_schema() -> ColumnarSchema {
256 ColumnarSchema::new(vec![
257 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
258 ColumnDef::required("name", ColumnType::String),
259 ColumnDef::nullable("score", ColumnType::Float64),
260 ])
261 .expect("valid schema")
262 }
263
264 #[test]
265 fn append_and_count() {
266 let schema = test_schema();
267 let mut mt = ColumnarMemtable::new(&schema);
268
269 mt.append_row(&[
270 Value::Integer(1),
271 Value::String("Alice".into()),
272 Value::Float(0.75),
273 ])
274 .expect("append");
275
276 mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
277 .expect("append");
278
279 assert_eq!(mt.row_count(), 2);
280 assert!(!mt.is_empty());
281 }
282
283 #[test]
284 fn null_violation_rejected() {
285 let schema = test_schema();
286 let mut mt = ColumnarMemtable::new(&schema);
287
288 let err = mt
289 .append_row(&[Value::Null, Value::String("x".into()), Value::Null])
290 .unwrap_err();
291 assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
292 }
293
294 #[test]
295 fn schema_mismatch_rejected() {
296 let schema = test_schema();
297 let mut mt = ColumnarMemtable::new(&schema);
298
299 let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
300 assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
301 }
302
303 #[test]
304 fn flush_threshold() {
305 let schema = test_schema();
306 let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
307
308 for i in 0..2 {
309 mt.append_row(&[
310 Value::Integer(i),
311 Value::String(format!("u{i}")),
312 Value::Null,
313 ])
314 .expect("append");
315 }
316 assert!(!mt.should_flush());
317
318 mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
319 .expect("append");
320 assert!(mt.should_flush());
321 }
322
323 #[test]
324 fn drain_resets() {
325 let schema = test_schema();
326 let mut mt = ColumnarMemtable::new(&schema);
327
328 mt.append_row(&[
329 Value::Integer(1),
330 Value::String("x".into()),
331 Value::Float(0.5),
332 ])
333 .expect("append");
334
335 let (_schema, columns, row_count) = mt.drain();
336 assert_eq!(row_count, 1);
337 assert_eq!(columns.len(), 3);
338 assert_eq!(mt.row_count(), 0);
339 assert!(mt.is_empty());
340
341 match &columns[0] {
342 ColumnData::Int64 { values, valid } => {
343 assert_eq!(values, &[1]);
344 assert!(valid.is_none());
345 }
346 _ => panic!("expected Int64"),
347 }
348 match &columns[1] {
349 ColumnData::String {
350 data,
351 offsets,
352 valid,
353 } => {
354 assert_eq!(std::str::from_utf8(data).unwrap(), "x");
355 assert_eq!(offsets, &[0, 1]);
356 assert!(valid.is_none());
357 }
358 _ => panic!("expected String"),
359 }
360 }
361
362 #[test]
363 fn all_types() {
364 let schema = ColumnarSchema::new(vec![
365 ColumnDef::required("i", ColumnType::Int64),
366 ColumnDef::required("f", ColumnType::Float64),
367 ColumnDef::required("b", ColumnType::Bool),
368 ColumnDef::required("ts", ColumnType::Timestamp),
369 ColumnDef::required("s", ColumnType::String),
370 ColumnDef::required("raw", ColumnType::Bytes),
371 ColumnDef::required("vec", ColumnType::Vector(3)),
372 ])
373 .expect("valid");
374
375 let mut mt = ColumnarMemtable::new(&schema);
376 mt.append_row(&[
377 Value::Integer(42),
378 Value::Float(0.25),
379 Value::Bool(true),
380 Value::Integer(1_700_000_000),
381 Value::String("hello".into()),
382 Value::Bytes(vec![0xDE, 0xAD]),
383 Value::Array(vec![
384 Value::Float(1.0),
385 Value::Float(2.0),
386 Value::Float(3.0),
387 ]),
388 ])
389 .expect("append all types");
390
391 assert_eq!(mt.row_count(), 1);
392 }
393
394 #[test]
395 fn dict_encode_low_cardinality() {
396 let schema = ColumnarSchema::new(vec![ColumnDef::required("qtype", ColumnType::String)])
397 .expect("valid");
398
399 let mut mt = ColumnarMemtable::new(&schema);
400 let qtypes = ["A", "B", "AAAA", "NS", "MX", "SOA", "CNAME", "PTR"];
401 for _ in 0..10 {
402 for &q in &qtypes {
403 mt.append_row(&[Value::String(q.into())]).expect("append");
404 }
405 }
406 assert_eq!(mt.row_count(), 80);
407
408 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
409
410 let (_schema, columns, _row_count) = mt.drain();
411 match &columns[0] {
412 ColumnData::DictEncoded {
413 ids,
414 dictionary,
415 valid,
416 ..
417 } => {
418 assert_eq!(ids.len(), 80);
419 assert!(valid.is_none());
420 assert_eq!(dictionary.len(), 8);
421 for &id in ids {
422 assert!((id as usize) < dictionary.len());
423 }
424 for (i, &q) in qtypes.iter().enumerate().take(8) {
425 let expected_id = dictionary.iter().position(|s| s == q).expect("in dict");
426 assert_eq!(ids[i], expected_id as u32);
427 }
428 }
429 _ => panic!("expected DictEncoded after try_dict_encode_columns"),
430 }
431 }
432
433 #[test]
434 fn dict_encode_exceeds_cardinality_stays_string() {
435 let schema = ColumnarSchema::new(vec![ColumnDef::required("name", ColumnType::String)])
436 .expect("valid");
437
438 let mut mt = ColumnarMemtable::new(&schema);
439 let max: u32 = 4;
440 for i in 0..=max {
441 mt.append_row(&[Value::String(format!("val_{i}"))])
442 .expect("append");
443 }
444
445 mt.try_dict_encode_columns(max);
446
447 let (_schema, columns, _row_count) = mt.drain();
448 assert!(matches!(columns[0], ColumnData::String { .. }));
449 }
450
451 #[test]
452 fn dict_encode_with_nulls() {
453 let schema = ColumnarSchema::new(vec![ColumnDef::nullable("tag", ColumnType::String)])
454 .expect("valid");
455
456 let mut mt = ColumnarMemtable::new(&schema);
457 mt.append_row(&[Value::String("foo".into())])
458 .expect("append");
459 mt.append_row(&[Value::Null]).expect("append null");
460 mt.append_row(&[Value::String("bar".into())])
461 .expect("append");
462 mt.append_row(&[Value::Null]).expect("append null");
463
464 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
465
466 let (_schema, columns, _row_count) = mt.drain();
467 match &columns[0] {
468 ColumnData::DictEncoded {
469 ids,
470 valid,
471 dictionary,
472 ..
473 } => {
474 assert_eq!(ids.len(), 4);
475 let v = valid.as_ref().expect("nullable column has validity bitmap");
476 assert_eq!(v.len(), 4);
477 assert!(v[0]);
478 assert!(!v[1]);
479 assert!(v[2]);
480 assert!(!v[3]);
481 assert_eq!(dictionary.len(), 2);
482 }
483 _ => panic!("expected DictEncoded"),
484 }
485 }
486}