1use nodedb_types::columnar::ColumnType;
4use nodedb_types::value::Value;
5
6use crate::error::ColumnarError;
7
8use super::IngestValue;
9
10#[derive(Debug, Clone)]
19pub enum ColumnData {
20 Int64 {
21 values: Vec<i64>,
22 valid: Option<Vec<bool>>,
23 },
24 Float64 {
25 values: Vec<f64>,
26 valid: Option<Vec<bool>>,
27 },
28 Bool {
29 values: Vec<bool>,
30 valid: Option<Vec<bool>>,
31 },
32 Timestamp {
33 values: Vec<i64>,
34 valid: Option<Vec<bool>>,
35 },
36 Decimal {
37 values: Vec<[u8; 16]>,
39 valid: Option<Vec<bool>>,
40 },
41 Uuid {
42 values: Vec<[u8; 16]>,
44 valid: Option<Vec<bool>>,
45 },
46 String {
47 data: Vec<u8>,
49 offsets: Vec<u32>,
51 valid: Option<Vec<bool>>,
52 },
53 Bytes {
54 data: Vec<u8>,
55 offsets: Vec<u32>,
56 valid: Option<Vec<bool>>,
57 },
58 Geometry {
59 data: Vec<u8>,
61 offsets: Vec<u32>,
62 valid: Option<Vec<bool>>,
63 },
64 Vector {
65 data: Vec<f32>,
67 dim: u32,
68 valid: Option<Vec<bool>>,
69 },
70 DictEncoded {
76 ids: Vec<u32>,
78 dictionary: Vec<String>,
80 reverse: std::collections::HashMap<String, u32>,
82 valid: Option<Vec<bool>>,
83 },
84}
85
86impl ColumnData {
87 pub(crate) fn new(col_type: &ColumnType, nullable: bool) -> Self {
92 let valid = if nullable { Some(Vec::new()) } else { None };
93 match col_type {
94 ColumnType::Int64 => Self::Int64 {
95 values: Vec::new(),
96 valid,
97 },
98 ColumnType::Float64 => Self::Float64 {
99 values: Vec::new(),
100 valid,
101 },
102 ColumnType::Bool => Self::Bool {
103 values: Vec::new(),
104 valid,
105 },
106 ColumnType::Timestamp => Self::Timestamp {
107 values: Vec::new(),
108 valid,
109 },
110 ColumnType::Decimal => Self::Decimal {
111 values: Vec::new(),
112 valid,
113 },
114 ColumnType::Uuid => Self::Uuid {
115 values: Vec::new(),
116 valid,
117 },
118 ColumnType::String => Self::String {
119 data: Vec::new(),
120 offsets: vec![0],
121 valid,
122 },
123 ColumnType::Bytes => Self::Bytes {
124 data: Vec::new(),
125 offsets: vec![0],
126 valid,
127 },
128 ColumnType::Geometry => Self::Geometry {
129 data: Vec::new(),
130 offsets: vec![0],
131 valid,
132 },
133 ColumnType::Vector(dim) => Self::Vector {
134 data: Vec::new(),
135 dim: *dim,
136 valid,
137 },
138 ColumnType::Json
139 | ColumnType::Array
140 | ColumnType::Set
141 | ColumnType::Range
142 | ColumnType::Record => Self::Bytes {
143 data: Vec::new(),
144 offsets: vec![0],
145 valid,
146 },
147 ColumnType::Ulid => Self::Uuid {
148 values: Vec::new(),
149 valid,
150 },
151 ColumnType::Duration => Self::Timestamp {
152 values: Vec::new(),
153 valid,
154 },
155 ColumnType::Regex => Self::String {
156 data: Vec::new(),
157 offsets: vec![0],
158 valid,
159 },
160 }
161 }
162
163 pub(crate) fn len(&self) -> usize {
165 match self {
166 Self::Int64 { values, .. } => values.len(),
167 Self::Float64 { values, .. } => values.len(),
168 Self::Bool { values, .. } => values.len(),
169 Self::Timestamp { values, .. } => values.len(),
170 Self::Decimal { values, .. } => values.len(),
171 Self::Uuid { values, .. } => values.len(),
172 Self::String { offsets, .. } => offsets.len().saturating_sub(1),
173 Self::Bytes { offsets, .. } => offsets.len().saturating_sub(1),
174 Self::Geometry { offsets, .. } => offsets.len().saturating_sub(1),
175 Self::Vector { data, dim, .. } => {
176 if *dim == 0 {
177 0
178 } else {
179 data.len() / *dim as usize
180 }
181 }
182 Self::DictEncoded { ids, .. } => ids.len(),
183 }
184 }
185
186 pub(crate) fn validity_or_all_true(&self) -> std::borrow::Cow<'_, [bool]> {
192 let valid_opt = match self {
193 Self::Int64 { valid, .. }
194 | Self::Float64 { valid, .. }
195 | Self::Bool { valid, .. }
196 | Self::Timestamp { valid, .. }
197 | Self::Decimal { valid, .. }
198 | Self::Uuid { valid, .. }
199 | Self::String { valid, .. }
200 | Self::Bytes { valid, .. }
201 | Self::Geometry { valid, .. }
202 | Self::Vector { valid, .. }
203 | Self::DictEncoded { valid, .. } => valid,
204 };
205 match valid_opt {
206 Some(v) => std::borrow::Cow::Borrowed(v.as_slice()),
207 None => std::borrow::Cow::Owned(vec![true; self.len()]),
208 }
209 }
210
211 #[inline]
215 fn is_null(&self, row: usize) -> bool {
216 let valid_opt = match self {
217 Self::Int64 { valid, .. }
218 | Self::Float64 { valid, .. }
219 | Self::Bool { valid, .. }
220 | Self::Timestamp { valid, .. }
221 | Self::Decimal { valid, .. }
222 | Self::Uuid { valid, .. }
223 | Self::String { valid, .. }
224 | Self::Bytes { valid, .. }
225 | Self::Geometry { valid, .. }
226 | Self::Vector { valid, .. }
227 | Self::DictEncoded { valid, .. } => valid,
228 };
229 valid_opt.as_ref().is_some_and(|v| !v[row])
230 }
231
232 pub(crate) fn get_value(&self, row: usize) -> Value {
234 if self.is_null(row) {
235 return Value::Null;
236 }
237 match self {
238 Self::Int64 { values, .. } => Value::Integer(values[row]),
239 Self::Float64 { values, .. } => Value::Float(values[row]),
240 Self::Bool { values, .. } => Value::Bool(values[row]),
241 Self::Timestamp { values, .. } => Value::DateTime(
242 nodedb_types::datetime::NdbDateTime::from_micros(values[row]),
243 ),
244 Self::Decimal { values, .. } => {
245 Value::Decimal(rust_decimal::Decimal::deserialize(values[row]))
246 }
247 Self::Uuid { values, .. } => {
248 Value::Uuid(uuid::Uuid::from_bytes(values[row]).to_string())
249 }
250 Self::String { data, offsets, .. } => {
251 let start = offsets[row] as usize;
252 let end = offsets[row + 1] as usize;
253 let s = std::str::from_utf8(&data[start..end])
254 .unwrap_or("")
255 .to_string();
256 Value::String(s)
257 }
258 Self::Bytes { data, offsets, .. } => {
259 let start = offsets[row] as usize;
260 let end = offsets[row + 1] as usize;
261 Value::Bytes(data[start..end].to_vec())
262 }
263 Self::Geometry { data, offsets, .. } => {
264 let start = offsets[row] as usize;
265 let end = offsets[row + 1] as usize;
266 let s = std::str::from_utf8(&data[start..end])
267 .unwrap_or("")
268 .to_string();
269 Value::String(s)
270 }
271 Self::Vector { data, dim, .. } => {
272 let d = *dim as usize;
273 let start = row * d;
274 let floats: Vec<Value> = data[start..start + d]
275 .iter()
276 .map(|&f| Value::Float(f as f64))
277 .collect();
278 Value::Array(floats)
279 }
280 Self::DictEncoded {
281 ids, dictionary, ..
282 } => {
283 let id = ids[row] as usize;
284 if id < dictionary.len() {
285 Value::String(dictionary[id].clone())
286 } else {
287 Value::Null
288 }
289 }
290 }
291 }
292
293 #[inline(always)]
295 pub(crate) fn push_valid(valid: &mut Option<Vec<bool>>, is_valid: bool) {
296 if let Some(v) = valid {
297 v.push(is_valid);
298 }
299 }
300
301 pub(crate) fn push(&mut self, value: &Value, col_name: &str) -> Result<(), ColumnarError> {
303 match (self, value) {
304 (Self::Int64 { values, valid }, Value::Null) => {
305 values.push(0);
306 Self::push_valid(valid, false);
307 }
308 (Self::Float64 { values, valid }, Value::Null) => {
309 values.push(0.0);
310 Self::push_valid(valid, false);
311 }
312 (Self::Bool { values, valid }, Value::Null) => {
313 values.push(false);
314 Self::push_valid(valid, false);
315 }
316 (Self::Timestamp { values, valid }, Value::Null) => {
317 values.push(0);
318 Self::push_valid(valid, false);
319 }
320 (Self::Decimal { values, valid }, Value::Null) => {
321 values.push([0u8; 16]);
322 Self::push_valid(valid, false);
323 }
324 (Self::Uuid { values, valid }, Value::Null) => {
325 values.push([0u8; 16]);
326 Self::push_valid(valid, false);
327 }
328 (Self::String { offsets, valid, .. }, Value::Null) => {
329 offsets.push(*offsets.last().unwrap_or(&0));
330 Self::push_valid(valid, false);
331 }
332 (Self::Bytes { offsets, valid, .. }, Value::Null) => {
333 offsets.push(*offsets.last().unwrap_or(&0));
334 Self::push_valid(valid, false);
335 }
336 (Self::Geometry { offsets, valid, .. }, Value::Null) => {
337 offsets.push(*offsets.last().unwrap_or(&0));
338 Self::push_valid(valid, false);
339 }
340 (Self::Vector { data, dim, valid }, Value::Null) => {
341 data.extend(std::iter::repeat_n(0.0f32, *dim as usize));
342 Self::push_valid(valid, false);
343 }
344 (Self::Int64 { values, valid }, Value::Integer(v)) => {
345 values.push(*v);
346 Self::push_valid(valid, true);
347 }
348 (Self::Float64 { values, valid }, Value::Float(v)) => {
349 values.push(*v);
350 Self::push_valid(valid, true);
351 }
352 (Self::Float64 { values, valid }, Value::Integer(v)) => {
353 values.push(*v as f64);
354 Self::push_valid(valid, true);
355 }
356 (Self::Bool { values, valid }, Value::Bool(v)) => {
357 values.push(*v);
358 Self::push_valid(valid, true);
359 }
360 (Self::Timestamp { values, valid }, Value::DateTime(dt)) => {
361 values.push(dt.micros);
362 Self::push_valid(valid, true);
363 }
364 (Self::Timestamp { values, valid }, Value::Integer(micros)) => {
365 values.push(*micros);
366 Self::push_valid(valid, true);
367 }
368 (Self::Decimal { values, valid }, Value::Decimal(d)) => {
369 values.push(d.serialize());
370 Self::push_valid(valid, true);
371 }
372 (Self::Uuid { values, valid }, Value::Uuid(s)) => {
373 let bytes = uuid::Uuid::parse_str(s)
374 .map(|u| *u.as_bytes())
375 .unwrap_or([0u8; 16]);
376 values.push(bytes);
377 Self::push_valid(valid, true);
378 }
379 (
380 Self::String {
381 data,
382 offsets,
383 valid,
384 },
385 Value::String(s),
386 ) => {
387 data.extend_from_slice(s.as_bytes());
388 offsets.push(data.len() as u32);
389 Self::push_valid(valid, true);
390 }
391 (
392 Self::Bytes {
393 data,
394 offsets,
395 valid,
396 },
397 Value::Bytes(b),
398 ) => {
399 data.extend_from_slice(b);
400 offsets.push(data.len() as u32);
401 Self::push_valid(valid, true);
402 }
403 (
404 Self::Geometry {
405 data,
406 offsets,
407 valid,
408 },
409 Value::Geometry(g),
410 ) => {
411 if let Ok(json) = sonic_rs::to_vec(g) {
412 data.extend_from_slice(&json);
413 }
414 offsets.push(data.len() as u32);
415 Self::push_valid(valid, true);
416 }
417 (
418 Self::Geometry {
419 data,
420 offsets,
421 valid,
422 },
423 Value::String(s),
424 ) => {
425 data.extend_from_slice(s.as_bytes());
426 offsets.push(data.len() as u32);
427 Self::push_valid(valid, true);
428 }
429 (Self::Vector { data, dim, valid }, Value::Array(arr)) => {
430 let d = *dim as usize;
431 for (i, v) in arr.iter().take(d).enumerate() {
432 let f = match v {
433 Value::Float(f) => *f as f32,
434 Value::Integer(n) => *n as f32,
435 _ => 0.0,
436 };
437 if i < d {
438 data.push(f);
439 }
440 }
441 for _ in arr.len()..d {
442 data.push(0.0);
443 }
444 Self::push_valid(valid, true);
445 }
446 (Self::DictEncoded { ids, valid, .. }, Value::Null) => {
447 ids.push(0);
448 Self::push_valid(valid, false);
449 }
450 (
451 Self::DictEncoded {
452 ids,
453 dictionary,
454 reverse,
455 valid,
456 },
457 Value::String(s),
458 ) => {
459 let id = if let Some(&existing) = reverse.get(s.as_str()) {
460 existing
461 } else {
462 let new_id = dictionary.len() as u32;
463 dictionary.push(s.clone());
464 reverse.insert(s.clone(), new_id);
465 new_id
466 };
467 ids.push(id);
468 Self::push_valid(valid, true);
469 }
470 (other, val) => {
471 let type_name = match other {
472 Self::Int64 { .. } => "Int64",
473 Self::Float64 { .. } => "Float64",
474 Self::Bool { .. } => "Bool",
475 Self::Timestamp { .. } => "Timestamp",
476 Self::Decimal { .. } => "Decimal",
477 Self::Uuid { .. } => "Uuid",
478 Self::String { .. } => "String",
479 Self::Bytes { .. } => "Bytes",
480 Self::Geometry { .. } => "Geometry",
481 Self::Vector { .. } => "Vector",
482 Self::DictEncoded { .. } => "DictEncoded",
483 };
484 let _ = val;
485 return Err(ColumnarError::TypeMismatch {
486 column: col_name.to_string(),
487 expected: type_name.to_string(),
488 });
489 }
490 }
491 Ok(())
492 }
493
494 pub(crate) fn push_ref(
496 &mut self,
497 value: &IngestValue<'_>,
498 col_name: &str,
499 ) -> Result<(), ColumnarError> {
500 match (self, value) {
501 (Self::Int64 { values, valid }, IngestValue::Null) => {
502 values.push(0);
503 Self::push_valid(valid, false);
504 }
505 (Self::Float64 { values, valid }, IngestValue::Null) => {
506 values.push(0.0);
507 Self::push_valid(valid, false);
508 }
509 (Self::Bool { values, valid }, IngestValue::Null) => {
510 values.push(false);
511 Self::push_valid(valid, false);
512 }
513 (Self::Timestamp { values, valid }, IngestValue::Null) => {
514 values.push(0);
515 Self::push_valid(valid, false);
516 }
517 (Self::String { offsets, valid, .. }, IngestValue::Null) => {
518 offsets.push(*offsets.last().unwrap_or(&0));
519 Self::push_valid(valid, false);
520 }
521 (Self::DictEncoded { ids, valid, .. }, IngestValue::Null) => {
522 ids.push(0);
523 Self::push_valid(valid, false);
524 }
525 (Self::Int64 { values, valid }, IngestValue::Int64(v)) => {
526 values.push(*v);
527 Self::push_valid(valid, true);
528 }
529 (Self::Float64 { values, valid }, IngestValue::Float64(v)) => {
530 values.push(*v);
531 Self::push_valid(valid, true);
532 }
533 (Self::Float64 { values, valid }, IngestValue::Int64(v)) => {
534 values.push(*v as f64);
535 Self::push_valid(valid, true);
536 }
537 (Self::Bool { values, valid }, IngestValue::Bool(v)) => {
538 values.push(*v);
539 Self::push_valid(valid, true);
540 }
541 (Self::Timestamp { values, valid }, IngestValue::Timestamp(v)) => {
542 values.push(*v);
543 Self::push_valid(valid, true);
544 }
545 (Self::Timestamp { values, valid }, IngestValue::Int64(v)) => {
546 values.push(*v);
547 Self::push_valid(valid, true);
548 }
549 (
550 Self::String {
551 data,
552 offsets,
553 valid,
554 },
555 IngestValue::Str(s),
556 ) => {
557 data.extend_from_slice(s.as_bytes());
558 offsets.push(data.len() as u32);
559 Self::push_valid(valid, true);
560 }
561 (
562 Self::DictEncoded {
563 ids,
564 dictionary,
565 reverse,
566 valid,
567 },
568 IngestValue::Str(s),
569 ) => {
570 let id = if let Some(&existing) = reverse.get(*s) {
571 existing
572 } else {
573 let new_id = dictionary.len() as u32;
574 dictionary.push((*s).to_string());
575 reverse.insert((*s).to_string(), new_id);
576 new_id
577 };
578 ids.push(id);
579 Self::push_valid(valid, true);
580 }
581 (other, _) => {
582 let type_name = match other {
583 Self::Int64 { .. } => "Int64",
584 Self::Float64 { .. } => "Float64",
585 Self::Bool { .. } => "Bool",
586 Self::Timestamp { .. } => "Timestamp",
587 Self::Decimal { .. } => "Decimal",
588 Self::Uuid { .. } => "Uuid",
589 Self::String { .. } => "String",
590 Self::Bytes { .. } => "Bytes",
591 Self::Geometry { .. } => "Geometry",
592 Self::Vector { .. } => "Vector",
593 Self::DictEncoded { .. } => "DictEncoded",
594 };
595 return Err(ColumnarError::TypeMismatch {
596 column: col_name.to_string(),
597 expected: type_name.to_string(),
598 });
599 }
600 }
601 Ok(())
602 }
603
604 pub(crate) fn backfill_nulls(&mut self, count: usize) {
606 match self {
607 Self::Int64 { values, valid } => {
608 values.extend(std::iter::repeat_n(0i64, count));
609 if let Some(v) = valid {
610 v.extend(std::iter::repeat_n(false, count));
611 }
612 }
613 Self::Float64 { values, valid } => {
614 values.extend(std::iter::repeat_n(f64::NAN, count));
615 if let Some(v) = valid {
616 v.extend(std::iter::repeat_n(false, count));
617 }
618 }
619 Self::Bool { values, valid } => {
620 values.extend(std::iter::repeat_n(false, count));
621 if let Some(v) = valid {
622 v.extend(std::iter::repeat_n(false, count));
623 }
624 }
625 Self::Timestamp { values, valid } => {
626 values.extend(std::iter::repeat_n(0i64, count));
627 if let Some(v) = valid {
628 v.extend(std::iter::repeat_n(false, count));
629 }
630 }
631 Self::Decimal { values, valid } => {
632 values.extend(std::iter::repeat_n([0u8; 16], count));
633 if let Some(v) = valid {
634 v.extend(std::iter::repeat_n(false, count));
635 }
636 }
637 Self::Uuid { values, valid } => {
638 values.extend(std::iter::repeat_n([0u8; 16], count));
639 if let Some(v) = valid {
640 v.extend(std::iter::repeat_n(false, count));
641 }
642 }
643 Self::String { offsets, valid, .. } => {
644 let last = *offsets.last().unwrap_or(&0);
645 offsets.extend(std::iter::repeat_n(last, count));
646 if let Some(v) = valid {
647 v.extend(std::iter::repeat_n(false, count));
648 }
649 }
650 Self::Bytes { offsets, valid, .. } => {
651 let last = *offsets.last().unwrap_or(&0);
652 offsets.extend(std::iter::repeat_n(last, count));
653 if let Some(v) = valid {
654 v.extend(std::iter::repeat_n(false, count));
655 }
656 }
657 Self::Geometry { offsets, valid, .. } => {
658 let last = *offsets.last().unwrap_or(&0);
659 offsets.extend(std::iter::repeat_n(last, count));
660 if let Some(v) = valid {
661 v.extend(std::iter::repeat_n(false, count));
662 }
663 }
664 Self::Vector { data, dim, valid } => {
665 data.extend(std::iter::repeat_n(0.0f32, *dim as usize * count));
666 if let Some(v) = valid {
667 v.extend(std::iter::repeat_n(false, count));
668 }
669 }
670 Self::DictEncoded { ids, valid, .. } => {
671 ids.extend(std::iter::repeat_n(0u32, count));
672 if let Some(v) = valid {
673 v.extend(std::iter::repeat_n(false, count));
674 }
675 }
676 }
677 }
678}
679
680pub const DICT_ENCODE_MAX_CARDINALITY: u32 = 1024;
682
683impl ColumnData {
684 pub fn try_dict_encode(col: &ColumnData, max_cardinality: u32) -> Option<ColumnData> {
686 let (data, offsets, valid) = match col {
687 ColumnData::String {
688 data,
689 offsets,
690 valid,
691 } => (data, offsets, valid),
692 _ => return None,
693 };
694
695 let row_count = col.len();
696 let mut dictionary: Vec<String> = Vec::new();
697 let mut reverse: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
698 let mut ids: Vec<u32> = Vec::with_capacity(row_count);
699
700 for i in 0..row_count {
701 if valid.as_ref().is_some_and(|v| !v[i]) {
702 ids.push(0);
703 continue;
704 }
705 let start = offsets[i] as usize;
706 let end = offsets[i + 1] as usize;
707 let s = match std::str::from_utf8(&data[start..end]) {
708 Ok(s) => s,
709 Err(_) => return None,
710 };
711 let id = if let Some(&existing) = reverse.get(s) {
712 existing
713 } else {
714 if dictionary.len() as u32 >= max_cardinality {
715 return None;
716 }
717 let new_id = dictionary.len() as u32;
718 dictionary.push(s.to_string());
719 reverse.insert(s.to_string(), new_id);
720 new_id
721 };
722 ids.push(id);
723 }
724
725 Some(ColumnData::DictEncoded {
726 ids,
727 dictionary,
728 reverse,
729 valid: valid.clone(),
730 })
731 }
732}