1use nodedb_types::columnar::{ColumnType, ColumnarSchema};
12use nodedb_types::value::Value;
13
14use crate::error::ColumnarError;
15
16pub const DEFAULT_FLUSH_THRESHOLD: usize = 65_536;
20
21#[derive(Debug, Clone)]
27pub enum ColumnData {
28 Int64 {
29 values: Vec<i64>,
30 valid: Vec<bool>,
31 },
32 Float64 {
33 values: Vec<f64>,
34 valid: Vec<bool>,
35 },
36 Bool {
37 values: Vec<bool>,
38 valid: Vec<bool>,
39 },
40 Timestamp {
41 values: Vec<i64>,
42 valid: Vec<bool>,
43 },
44 Decimal {
45 values: Vec<[u8; 16]>,
47 valid: Vec<bool>,
48 },
49 Uuid {
50 values: Vec<[u8; 16]>,
52 valid: Vec<bool>,
53 },
54 String {
55 data: Vec<u8>,
57 offsets: Vec<u32>,
59 valid: Vec<bool>,
60 },
61 Bytes {
62 data: Vec<u8>,
63 offsets: Vec<u32>,
64 valid: Vec<bool>,
65 },
66 Geometry {
67 data: Vec<u8>,
69 offsets: Vec<u32>,
70 valid: Vec<bool>,
71 },
72 Vector {
73 data: Vec<f32>,
75 dim: u32,
76 valid: Vec<bool>,
77 },
78}
79
80impl ColumnData {
81 fn new(col_type: &ColumnType) -> Self {
83 match col_type {
84 ColumnType::Int64 => Self::Int64 {
85 values: Vec::new(),
86 valid: Vec::new(),
87 },
88 ColumnType::Float64 => Self::Float64 {
89 values: Vec::new(),
90 valid: Vec::new(),
91 },
92 ColumnType::Bool => Self::Bool {
93 values: Vec::new(),
94 valid: Vec::new(),
95 },
96 ColumnType::Timestamp => Self::Timestamp {
97 values: Vec::new(),
98 valid: Vec::new(),
99 },
100 ColumnType::Decimal => Self::Decimal {
101 values: Vec::new(),
102 valid: Vec::new(),
103 },
104 ColumnType::Uuid => Self::Uuid {
105 values: Vec::new(),
106 valid: Vec::new(),
107 },
108 ColumnType::String => Self::String {
109 data: Vec::new(),
110 offsets: vec![0],
111 valid: Vec::new(),
112 },
113 ColumnType::Bytes => Self::Bytes {
114 data: Vec::new(),
115 offsets: vec![0],
116 valid: Vec::new(),
117 },
118 ColumnType::Geometry => Self::Geometry {
119 data: Vec::new(),
120 offsets: vec![0],
121 valid: Vec::new(),
122 },
123 ColumnType::Vector(dim) => Self::Vector {
124 data: Vec::new(),
125 dim: *dim,
126 valid: Vec::new(),
127 },
128 }
129 }
130
131 pub(crate) fn len(&self) -> usize {
133 match self {
134 Self::Int64 { valid, .. }
135 | Self::Float64 { valid, .. }
136 | Self::Bool { valid, .. }
137 | Self::Timestamp { valid, .. }
138 | Self::Decimal { valid, .. }
139 | Self::Uuid { valid, .. }
140 | Self::String { valid, .. }
141 | Self::Bytes { valid, .. }
142 | Self::Geometry { valid, .. }
143 | Self::Vector { valid, .. } => valid.len(),
144 }
145 }
146
147 fn push(&mut self, value: &Value, col_name: &str) -> Result<(), ColumnarError> {
149 match (self, value) {
150 (Self::Int64 { values, valid }, Value::Null) => {
152 values.push(0);
153 valid.push(false);
154 }
155 (Self::Float64 { values, valid }, Value::Null) => {
156 values.push(0.0);
157 valid.push(false);
158 }
159 (Self::Bool { values, valid }, Value::Null) => {
160 values.push(false);
161 valid.push(false);
162 }
163 (Self::Timestamp { values, valid }, Value::Null) => {
164 values.push(0);
165 valid.push(false);
166 }
167 (Self::Decimal { values, valid }, Value::Null) => {
168 values.push([0u8; 16]);
169 valid.push(false);
170 }
171 (Self::Uuid { values, valid }, Value::Null) => {
172 values.push([0u8; 16]);
173 valid.push(false);
174 }
175 (
176 Self::String {
177 data: _,
178 offsets,
179 valid,
180 },
181 Value::Null,
182 ) => {
183 offsets.push(*offsets.last().unwrap_or(&0));
184 valid.push(false);
185 }
186 (
187 Self::Bytes {
188 data: _,
189 offsets,
190 valid,
191 },
192 Value::Null,
193 ) => {
194 offsets.push(*offsets.last().unwrap_or(&0));
195 valid.push(false);
196 }
197 (
198 Self::Geometry {
199 data: _,
200 offsets,
201 valid,
202 },
203 Value::Null,
204 ) => {
205 offsets.push(*offsets.last().unwrap_or(&0));
206 valid.push(false);
207 }
208 (Self::Vector { data, dim, valid }, Value::Null) => {
209 data.extend(std::iter::repeat_n(0.0f32, *dim as usize));
210 valid.push(false);
211 }
212
213 (Self::Int64 { values, valid }, Value::Integer(v)) => {
215 values.push(*v);
216 valid.push(true);
217 }
218 (Self::Float64 { values, valid }, Value::Float(v)) => {
219 values.push(*v);
220 valid.push(true);
221 }
222 (Self::Float64 { values, valid }, Value::Integer(v)) => {
223 values.push(*v as f64);
224 valid.push(true);
225 }
226 (Self::Bool { values, valid }, Value::Bool(v)) => {
227 values.push(*v);
228 valid.push(true);
229 }
230 (Self::Timestamp { values, valid }, Value::DateTime(dt)) => {
231 values.push(dt.micros);
232 valid.push(true);
233 }
234 (Self::Timestamp { values, valid }, Value::Integer(micros)) => {
235 values.push(*micros);
236 valid.push(true);
237 }
238 (Self::Decimal { values, valid }, Value::Decimal(d)) => {
239 values.push(d.serialize());
240 valid.push(true);
241 }
242 (Self::Uuid { values, valid }, Value::Uuid(s)) => {
243 let bytes = uuid::Uuid::parse_str(s)
244 .map(|u| *u.as_bytes())
245 .unwrap_or([0u8; 16]);
246 values.push(bytes);
247 valid.push(true);
248 }
249 (
250 Self::String {
251 data,
252 offsets,
253 valid,
254 },
255 Value::String(s),
256 ) => {
257 data.extend_from_slice(s.as_bytes());
258 offsets.push(data.len() as u32);
259 valid.push(true);
260 }
261 (
262 Self::Bytes {
263 data,
264 offsets,
265 valid,
266 },
267 Value::Bytes(b),
268 ) => {
269 data.extend_from_slice(b);
270 offsets.push(data.len() as u32);
271 valid.push(true);
272 }
273 (
274 Self::Geometry {
275 data,
276 offsets,
277 valid,
278 },
279 Value::Geometry(g),
280 ) => {
281 if let Ok(json) = serde_json::to_vec(g) {
282 data.extend_from_slice(&json);
283 }
284 offsets.push(data.len() as u32);
285 valid.push(true);
286 }
287 (
288 Self::Geometry {
289 data,
290 offsets,
291 valid,
292 },
293 Value::String(s),
294 ) => {
295 data.extend_from_slice(s.as_bytes());
296 offsets.push(data.len() as u32);
297 valid.push(true);
298 }
299 (Self::Vector { data, dim, valid }, Value::Array(arr)) => {
300 let d = *dim as usize;
301 for (i, v) in arr.iter().take(d).enumerate() {
302 let f = match v {
303 Value::Float(f) => *f as f32,
304 Value::Integer(n) => *n as f32,
305 _ => 0.0,
306 };
307 if i < d {
308 data.push(f);
309 }
310 }
311 for _ in arr.len()..d {
313 data.push(0.0);
314 }
315 valid.push(true);
316 }
317
318 (other, val) => {
319 let type_name = match other {
320 Self::Int64 { .. } => "Int64",
321 Self::Float64 { .. } => "Float64",
322 Self::Bool { .. } => "Bool",
323 Self::Timestamp { .. } => "Timestamp",
324 Self::Decimal { .. } => "Decimal",
325 Self::Uuid { .. } => "Uuid",
326 Self::String { .. } => "String",
327 Self::Bytes { .. } => "Bytes",
328 Self::Geometry { .. } => "Geometry",
329 Self::Vector { .. } => "Vector",
330 };
331 let _ = val; return Err(ColumnarError::TypeMismatch {
333 column: col_name.to_string(),
334 expected: type_name.to_string(),
335 });
336 }
337 }
338 Ok(())
339 }
340}
341
342pub struct ColumnarMemtable {
347 schema: ColumnarSchema,
348 columns: Vec<ColumnData>,
349 row_count: usize,
350 flush_threshold: usize,
351}
352
353impl ColumnarMemtable {
354 pub fn new(schema: &ColumnarSchema) -> Self {
356 Self::with_threshold(schema, DEFAULT_FLUSH_THRESHOLD)
357 }
358
359 pub fn with_threshold(schema: &ColumnarSchema, flush_threshold: usize) -> Self {
361 let columns = schema
362 .columns
363 .iter()
364 .map(|col| ColumnData::new(&col.column_type))
365 .collect();
366 Self {
367 schema: schema.clone(),
368 columns,
369 row_count: 0,
370 flush_threshold,
371 }
372 }
373
374 pub fn append_row(&mut self, values: &[Value]) -> Result<(), ColumnarError> {
376 if values.len() != self.schema.columns.len() {
377 return Err(ColumnarError::SchemaMismatch {
378 expected: self.schema.columns.len(),
379 got: values.len(),
380 });
381 }
382
383 for (i, (col_def, value)) in self.schema.columns.iter().zip(values.iter()).enumerate() {
384 if matches!(value, Value::Null) && !col_def.nullable {
385 return Err(ColumnarError::NullViolation(col_def.name.clone()));
386 }
387 self.columns[i].push(value, &col_def.name)?;
388 }
389
390 self.row_count += 1;
391 debug_assert!(
392 self.columns.iter().all(|c| c.len() == self.row_count),
393 "column lengths must stay aligned with row_count"
394 );
395 Ok(())
396 }
397
398 pub fn row_count(&self) -> usize {
400 self.row_count
401 }
402
403 pub fn should_flush(&self) -> bool {
405 self.row_count >= self.flush_threshold
406 }
407
408 pub fn is_empty(&self) -> bool {
410 self.row_count == 0
411 }
412
413 pub fn schema(&self) -> &ColumnarSchema {
415 &self.schema
416 }
417
418 pub fn columns(&self) -> &[ColumnData] {
420 &self.columns
421 }
422
423 pub fn drain(&mut self) -> (ColumnarSchema, Vec<ColumnData>, usize) {
425 let columns = std::mem::replace(
426 &mut self.columns,
427 self.schema
428 .columns
429 .iter()
430 .map(|col| ColumnData::new(&col.column_type))
431 .collect(),
432 );
433 let row_count = self.row_count;
434 self.row_count = 0;
435 (self.schema.clone(), columns, row_count)
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
442
443 use super::*;
444
445 fn test_schema() -> ColumnarSchema {
446 ColumnarSchema::new(vec![
447 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
448 ColumnDef::required("name", ColumnType::String),
449 ColumnDef::nullable("score", ColumnType::Float64),
450 ])
451 .expect("valid schema")
452 }
453
454 #[test]
455 fn append_and_count() {
456 let schema = test_schema();
457 let mut mt = ColumnarMemtable::new(&schema);
458
459 mt.append_row(&[
460 Value::Integer(1),
461 Value::String("Alice".into()),
462 Value::Float(0.75),
463 ])
464 .expect("append");
465
466 mt.append_row(&[Value::Integer(2), Value::String("Bob".into()), Value::Null])
467 .expect("append");
468
469 assert_eq!(mt.row_count(), 2);
470 assert!(!mt.is_empty());
471 }
472
473 #[test]
474 fn null_violation_rejected() {
475 let schema = test_schema();
476 let mut mt = ColumnarMemtable::new(&schema);
477
478 let err = mt
479 .append_row(&[
480 Value::Null, Value::String("x".into()),
482 Value::Null,
483 ])
484 .unwrap_err();
485 assert!(matches!(err, ColumnarError::NullViolation(ref s) if s == "id"));
486 }
487
488 #[test]
489 fn schema_mismatch_rejected() {
490 let schema = test_schema();
491 let mut mt = ColumnarMemtable::new(&schema);
492
493 let err = mt.append_row(&[Value::Integer(1)]).unwrap_err();
494 assert!(matches!(err, ColumnarError::SchemaMismatch { .. }));
495 }
496
497 #[test]
498 fn flush_threshold() {
499 let schema = test_schema();
500 let mut mt = ColumnarMemtable::with_threshold(&schema, 3);
501
502 for i in 0..2 {
503 mt.append_row(&[
504 Value::Integer(i),
505 Value::String(format!("u{i}")),
506 Value::Null,
507 ])
508 .expect("append");
509 }
510 assert!(!mt.should_flush());
511
512 mt.append_row(&[Value::Integer(2), Value::String("u2".into()), Value::Null])
513 .expect("append");
514 assert!(mt.should_flush());
515 }
516
517 #[test]
518 fn drain_resets() {
519 let schema = test_schema();
520 let mut mt = ColumnarMemtable::new(&schema);
521
522 mt.append_row(&[
523 Value::Integer(1),
524 Value::String("x".into()),
525 Value::Float(0.5),
526 ])
527 .expect("append");
528
529 let (_schema, columns, row_count) = mt.drain();
530 assert_eq!(row_count, 1);
531 assert_eq!(columns.len(), 3);
532 assert_eq!(mt.row_count(), 0);
533 assert!(mt.is_empty());
534
535 match &columns[0] {
537 ColumnData::Int64 { values, valid } => {
538 assert_eq!(values, &[1]);
539 assert_eq!(valid, &[true]);
540 }
541 _ => panic!("expected Int64"),
542 }
543 match &columns[1] {
544 ColumnData::String {
545 data,
546 offsets,
547 valid,
548 } => {
549 assert_eq!(std::str::from_utf8(data).unwrap(), "x");
550 assert_eq!(offsets, &[0, 1]);
551 assert_eq!(valid, &[true]);
552 }
553 _ => panic!("expected String"),
554 }
555 }
556
557 #[test]
558 fn all_types() {
559 let schema = ColumnarSchema::new(vec![
560 ColumnDef::required("i", ColumnType::Int64),
561 ColumnDef::required("f", ColumnType::Float64),
562 ColumnDef::required("b", ColumnType::Bool),
563 ColumnDef::required("ts", ColumnType::Timestamp),
564 ColumnDef::required("s", ColumnType::String),
565 ColumnDef::required("raw", ColumnType::Bytes),
566 ColumnDef::required("vec", ColumnType::Vector(3)),
567 ])
568 .expect("valid");
569
570 let mut mt = ColumnarMemtable::new(&schema);
571 mt.append_row(&[
572 Value::Integer(42),
573 Value::Float(0.25),
574 Value::Bool(true),
575 Value::Integer(1_700_000_000), Value::String("hello".into()),
577 Value::Bytes(vec![0xDE, 0xAD]),
578 Value::Array(vec![
579 Value::Float(1.0),
580 Value::Float(2.0),
581 Value::Float(3.0),
582 ]),
583 ])
584 .expect("append all types");
585
586 assert_eq!(mt.row_count(), 1);
587 }
588}