1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
242 if let Some(cached) = self.state.cached_multiplicities() {
244 return Arc::clone(cached);
245 }
246
247 let mults = self.compute_path_multiplicities();
249 let arc_mults: Arc<[usize]> = mults.into();
250 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
251 arc_mults
252 }
253
254 #[must_use]
256 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
257 self.levels.get(index)
258 }
259
260 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
262 self.levels.get_mut(index)
263 }
264
265 pub fn add_level(
276 &mut self,
277 columns: Vec<ValueVector>,
278 column_names: Vec<String>,
279 offsets: &[u32],
280 ) {
281 let parent_count = offsets.len().saturating_sub(1);
282
283 let multiplicities: Vec<usize> = (0..parent_count)
285 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
286 .collect();
287
288 let factorized_columns: Vec<FactorizedVector> = columns
290 .into_iter()
291 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
292 .collect();
293
294 let level =
295 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
296 self.levels.push(level);
297
298 if self.levels.len() == 1 {
302 self.logical_row_count = multiplicities.iter().sum();
304 } else {
305 self.recompute_logical_row_count();
307 }
308
309 self.update_state();
311 }
312
313 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
315 self.levels.push(level);
316 self.recompute_logical_row_count();
317 self.update_state();
318 }
319
320 fn update_state(&mut self) {
322 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
323 }
324
325 fn recompute_logical_row_count(&mut self) {
327 if self.levels.is_empty() {
328 self.logical_row_count = 0;
329 return;
330 }
331
332 let level0_count = self.levels[0].group_count;
334 if self.levels.len() == 1 {
335 self.logical_row_count = level0_count;
336 return;
337 }
338
339 let mut counts = vec![1usize; level0_count];
342
343 for level_idx in 1..self.levels.len() {
344 let level = &self.levels[level_idx];
345 let mut new_counts = Vec::with_capacity(counts.len() * 2); for (parent_idx, &parent_count) in counts.iter().enumerate() {
348 if parent_idx < level.multiplicities.len() {
350 let child_mult = level.multiplicities[parent_idx];
351 for _ in 0..child_mult {
352 new_counts.push(parent_count);
353 }
354 }
355 }
356
357 counts = new_counts;
358 }
359
360 self.logical_row_count = counts.len();
361 }
362
363 #[must_use]
367 pub fn flatten(&self) -> DataChunk {
368 if self.levels.is_empty() {
369 return DataChunk::empty();
370 }
371
372 let mut all_columns: Vec<ValueVector> = Vec::new();
374
375 if self.levels.len() == 1 {
377 let level = &self.levels[0];
378 for col in &level.columns {
379 all_columns.push(col.flatten(None));
380 }
381 return DataChunk::new(all_columns);
382 }
383
384 let row_iter = self.logical_row_iter();
387 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
388
389 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
391 for level in &self.levels {
392 for col in &level.columns {
393 output_columns.push(ValueVector::with_capacity(
394 col.data_type(),
395 self.logical_row_count,
396 ));
397 }
398 }
399
400 for indices in row_iter {
402 let mut col_offset = 0;
403 for (level_idx, level) in self.levels.iter().enumerate() {
404 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
405 for (col_idx, col) in level.columns.iter().enumerate() {
406 if let Some(value) = col.get_physical(level_idx_value) {
407 output_columns[col_offset + col_idx].push_value(value);
408 }
409 }
410 col_offset += level.column_count();
411 }
412 }
413
414 DataChunk::new(output_columns)
415 }
416
417 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
421 FactorizedRowIterator::new(self)
422 }
423
424 #[must_use]
426 pub fn total_column_count(&self) -> usize {
427 self.levels.iter().map(|l| l.column_count()).sum()
428 }
429
430 #[must_use]
432 pub fn all_column_names(&self) -> Vec<String> {
433 self.levels
434 .iter()
435 .flat_map(|l| l.column_names.iter().cloned())
436 .collect()
437 }
438
439 #[must_use]
457 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
458 where
459 F: Fn(&grafeo_common::types::Value) -> bool,
460 {
461 if self.levels.is_empty() {
462 return None;
463 }
464
465 let deepest_idx = self.levels.len() - 1;
466 let deepest = &self.levels[deepest_idx];
467
468 let filter_col = deepest.column(column_idx)?;
470
471 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
473 .map(|i| {
474 ValueVector::with_type(
475 deepest
476 .column(i)
477 .expect("column exists: i < column_count")
478 .data_type(),
479 )
480 })
481 .collect();
482
483 let parent_count = filter_col.parent_count();
485 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
486 let mut new_offsets: Vec<u32> = vec![0];
487
488 for parent_idx in 0..parent_count {
490 let (start, end) = filter_col.range_for_parent(parent_idx);
491
492 for phys_idx in start..end {
493 if let Some(value) = filter_col.get_physical(phys_idx)
495 && predicate(&value)
496 {
497 for col_idx in 0..deepest.column_count() {
499 if let Some(col) = deepest.column(col_idx)
500 && let Some(v) = col.get_physical(phys_idx)
501 {
502 new_columns[col_idx].push_value(v);
503 }
504 }
505 new_multiplicities[parent_idx] += 1;
506 }
507 }
508
509 #[allow(clippy::cast_possible_truncation)]
511 new_offsets.push(new_columns[0].len() as u32);
512 }
513
514 let total_remaining: usize = new_multiplicities.iter().sum();
516 if total_remaining == 0 {
517 return Some(Self::empty());
518 }
519
520 let new_factorized_cols: Vec<FactorizedVector> = new_columns
522 .into_iter()
523 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
524 .collect();
525
526 let new_level = FactorizationLevel::unflat(
527 new_factorized_cols,
528 deepest.column_names().to_vec(),
529 new_multiplicities,
530 );
531
532 let mut result = Self {
534 levels: self.levels[..deepest_idx].to_vec(),
535 logical_row_count: 0,
536 state: ChunkState::flat(0),
537 };
538 result.levels.push(new_level);
539 result.recompute_logical_row_count();
540 result.update_state();
541
542 Some(result)
543 }
544
545 #[must_use]
558 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
559 where
560 F: Fn(&[grafeo_common::types::Value]) -> bool,
561 {
562 if self.levels.is_empty() {
563 return None;
564 }
565
566 let deepest_idx = self.levels.len() - 1;
567 let deepest = &self.levels[deepest_idx];
568 let col_count = deepest.column_count();
569
570 if col_count == 0 {
571 return None;
572 }
573
574 let first_col = deepest.column(0)?;
575 let parent_count = first_col.parent_count();
576
577 let mut new_columns: Vec<ValueVector> = (0..col_count)
579 .map(|i| {
580 ValueVector::with_type(
581 deepest
582 .column(i)
583 .expect("column exists: i < column_count")
584 .data_type(),
585 )
586 })
587 .collect();
588
589 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
590 let mut new_offsets: Vec<u32> = vec![0];
591 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
592
593 for parent_idx in 0..parent_count {
594 let (start, end) = first_col.range_for_parent(parent_idx);
595
596 for phys_idx in start..end {
597 row_values.clear();
599 for col_idx in 0..col_count {
600 if let Some(col) = deepest.column(col_idx)
601 && let Some(v) = col.get_physical(phys_idx)
602 {
603 row_values.push(v);
604 }
605 }
606
607 if predicate(&row_values) {
609 for (col_idx, v) in row_values.iter().enumerate() {
610 new_columns[col_idx].push_value(v.clone());
611 }
612 new_multiplicities[parent_idx] += 1;
613 }
614 }
615
616 #[allow(clippy::cast_possible_truncation)]
618 new_offsets.push(new_columns[0].len() as u32);
619 }
620
621 let total: usize = new_multiplicities.iter().sum();
623 if total == 0 {
624 return Some(Self::empty());
625 }
626
627 let new_factorized_cols: Vec<FactorizedVector> = new_columns
629 .into_iter()
630 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
631 .collect();
632
633 let new_level = FactorizationLevel::unflat(
634 new_factorized_cols,
635 deepest.column_names().to_vec(),
636 new_multiplicities,
637 );
638
639 let mut result = Self {
640 levels: self.levels[..deepest_idx].to_vec(),
641 logical_row_count: 0,
642 state: ChunkState::flat(0),
643 };
644 result.levels.push(new_level);
645 result.recompute_logical_row_count();
646 result.update_state();
647
648 Some(result)
649 }
650
651 #[must_use]
671 pub fn count_rows(&self) -> usize {
672 self.logical_row_count()
673 }
674
675 #[must_use]
692 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
693 if self.levels.is_empty() {
694 return Vec::new();
695 }
696
697 if self.levels.len() == 1 {
699 return vec![1; self.levels[0].group_count];
700 }
701
702 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
704
705 for level_idx in 1..self.levels.len() {
707 let level = &self.levels[level_idx];
708 let mut child_multiplicities = Vec::with_capacity(level.group_count);
709
710 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
712 let child_count = if parent_idx < level.multiplicities.len() {
713 level.multiplicities[parent_idx]
714 } else {
715 0
716 };
717
718 for _ in 0..child_count {
720 child_multiplicities.push(parent_mult);
721 }
722 }
723
724 parent_multiplicities = child_multiplicities;
725 }
726
727 parent_multiplicities
728 }
729
730 #[must_use]
743 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
744 if self.levels.is_empty() {
745 return None;
746 }
747
748 let deepest_idx = self.levels.len() - 1;
749 let deepest = &self.levels[deepest_idx];
750 let col = deepest.column(column_idx)?;
751
752 let multiplicities = self.compute_path_multiplicities();
754
755 let mut sum = 0.0;
756 for (phys_idx, mult) in multiplicities.iter().enumerate() {
757 if let Some(value) = col.get_physical(phys_idx) {
758 let num_value = match &value {
760 grafeo_common::types::Value::Int64(v) => *v as f64,
761 grafeo_common::types::Value::Float64(v) => *v,
762 _ => continue, };
764 sum += num_value * (*mult as f64);
765 }
766 }
767 Some(sum)
768 }
769
770 #[must_use]
782 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
783 let count = self.logical_row_count();
784 if count == 0 {
785 return None;
786 }
787
788 let sum = self.sum_deepest(column_idx)?;
789 Some(sum / count as f64)
790 }
791
792 #[must_use]
805 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
806 if self.levels.is_empty() {
807 return None;
808 }
809
810 let deepest_idx = self.levels.len() - 1;
811 let deepest = &self.levels[deepest_idx];
812 let col = deepest.column(column_idx)?;
813
814 let mut min_value: Option<grafeo_common::types::Value> = None;
815
816 for phys_idx in 0..col.physical_len() {
817 if let Some(value) = col.get_physical(phys_idx) {
818 min_value = Some(match min_value {
819 None => value,
820 Some(current) => {
821 if Self::value_less_than(&value, ¤t) {
822 value
823 } else {
824 current
825 }
826 }
827 });
828 }
829 }
830
831 min_value
832 }
833
834 #[must_use]
847 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
848 if self.levels.is_empty() {
849 return None;
850 }
851
852 let deepest_idx = self.levels.len() - 1;
853 let deepest = &self.levels[deepest_idx];
854 let col = deepest.column(column_idx)?;
855
856 let mut max_value: Option<grafeo_common::types::Value> = None;
857
858 for phys_idx in 0..col.physical_len() {
859 if let Some(value) = col.get_physical(phys_idx) {
860 max_value = Some(match max_value {
861 None => value,
862 Some(current) => {
863 if Self::value_less_than(¤t, &value) {
864 value
865 } else {
866 current
867 }
868 }
869 });
870 }
871 }
872
873 max_value
874 }
875
876 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
884 use grafeo_common::types::Value;
885
886 match (a, b) {
887 (Value::Null, Value::Null) => false,
889 (Value::Null, _) => true,
890 (_, Value::Null) => false,
891
892 (Value::Int64(x), Value::Int64(y)) => x < y,
894 (Value::Float64(x), Value::Float64(y)) => x < y,
895 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
896 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
897
898 (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
900
901 (Value::Bool(x), Value::Bool(y)) => !x && *y,
903
904 _ => false,
907 }
908 }
909
910 #[must_use]
924 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
925 if self.levels.is_empty() || column_specs.is_empty() {
926 return Self::empty();
927 }
928
929 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
931 for (level_idx, col_idx, name) in column_specs {
932 if *level_idx < self.levels.len() {
933 level_specs[*level_idx].push((*col_idx, name.clone()));
934 }
935 }
936
937 let mut new_levels = Vec::new();
939
940 for (level_idx, specs) in level_specs.iter().enumerate() {
941 if specs.is_empty() {
942 continue;
943 }
944
945 let src_level = &self.levels[level_idx];
946
947 let columns: Vec<FactorizedVector> = specs
948 .iter()
949 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
950 .collect();
951
952 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
953
954 if level_idx == 0 {
955 new_levels.push(FactorizationLevel::flat(columns, names));
956 } else {
957 let mults = src_level.multiplicities().to_vec();
958 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
959 }
960 }
961
962 if new_levels.is_empty() {
963 return Self::empty();
964 }
965
966 let mut result = Self {
967 levels: new_levels,
968 logical_row_count: 0,
969 state: ChunkState::flat(0),
970 };
971 result.recompute_logical_row_count();
972 result.update_state();
973 result
974 }
975}
976
977pub struct FactorizedRowIterator<'a> {
992 chunk: &'a FactorizedChunk,
993 indices: Vec<usize>,
995 exhausted: bool,
998}
999
1000impl<'a> FactorizedRowIterator<'a> {
1001 fn new(chunk: &'a FactorizedChunk) -> Self {
1002 let indices = vec![0; chunk.level_count()];
1003 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
1004
1005 let mut iter = Self {
1006 chunk,
1007 indices,
1008 exhausted,
1009 };
1010
1011 if !exhausted && !iter.has_valid_deepest_range() {
1013 if !iter.advance() {
1014 exhausted = true;
1015 }
1016 iter.exhausted = exhausted;
1017 }
1018
1019 iter
1020 }
1021
1022 fn advance(&mut self) -> bool {
1024 if self.exhausted || self.chunk.levels.is_empty() {
1025 return false;
1026 }
1027
1028 for level_idx in (0..self.chunk.levels.len()).rev() {
1030 let level = &self.chunk.levels[level_idx];
1031
1032 let parent_idx = if level_idx == 0 {
1034 self.indices[0] + 1
1036 } else {
1037 self.indices[level_idx - 1]
1039 };
1040
1041 let (_start, end) = if level_idx == 0 {
1043 (0, level.group_count)
1044 } else {
1045 if let Some(col) = level.columns.first() {
1047 col.range_for_parent(parent_idx)
1048 } else {
1049 (0, 0)
1050 }
1051 };
1052
1053 let current = self.indices[level_idx];
1054 if current + 1 < end {
1055 self.indices[level_idx] = current + 1;
1057 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1059 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1060 let (deeper_start, _) =
1061 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1062 self.indices[deeper_idx] = deeper_start;
1063 }
1064 }
1065
1066 if self.has_valid_deepest_range() {
1069 return true;
1070 }
1071 return self.advance();
1074 }
1075 }
1077
1078 self.exhausted = true;
1080 false
1081 }
1082
1083 fn has_valid_deepest_range(&self) -> bool {
1089 if self.chunk.levels.len() <= 1 {
1090 return true; }
1092
1093 for level_idx in 1..self.chunk.levels.len() {
1095 let parent_idx = self.indices[level_idx - 1];
1096 if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1097 let (start, end) = col.range_for_parent(parent_idx);
1098 if start >= end {
1099 return false;
1100 }
1101 } else {
1102 return false;
1103 }
1104 }
1105
1106 true
1107 }
1108}
1109
1110impl Iterator for FactorizedRowIterator<'_> {
1111 type Item = Vec<usize>;
1112
1113 fn next(&mut self) -> Option<Self::Item> {
1114 if self.exhausted {
1115 return None;
1116 }
1117
1118 let result = self.indices.clone();
1120 self.advance();
1121 Some(result)
1122 }
1123}
1124
1125#[derive(Debug, Clone)]
1127#[non_exhaustive]
1128pub enum ChunkVariant {
1129 Flat(DataChunk),
1131 Factorized(FactorizedChunk),
1133}
1134
1135impl ChunkVariant {
1136 #[must_use]
1138 pub fn flat(chunk: DataChunk) -> Self {
1139 Self::Flat(chunk)
1140 }
1141
1142 #[must_use]
1144 pub fn factorized(chunk: FactorizedChunk) -> Self {
1145 Self::Factorized(chunk)
1146 }
1147
1148 #[must_use]
1150 pub fn ensure_flat(self) -> DataChunk {
1151 match self {
1152 Self::Flat(chunk) => chunk,
1153 Self::Factorized(chunk) => chunk.flatten(),
1154 }
1155 }
1156
1157 #[must_use]
1159 pub fn logical_row_count(&self) -> usize {
1160 match self {
1161 Self::Flat(chunk) => chunk.row_count(),
1162 Self::Factorized(chunk) => chunk.logical_row_count(),
1163 }
1164 }
1165
1166 #[must_use]
1168 pub fn is_factorized(&self) -> bool {
1169 matches!(self, Self::Factorized(_))
1170 }
1171
1172 #[must_use]
1174 pub fn is_flat(&self) -> bool {
1175 matches!(self, Self::Flat(_))
1176 }
1177
1178 #[must_use]
1180 pub fn is_empty(&self) -> bool {
1181 self.logical_row_count() == 0
1182 }
1183}
1184
1185impl From<DataChunk> for ChunkVariant {
1186 fn from(chunk: DataChunk) -> Self {
1187 Self::Flat(chunk)
1188 }
1189}
1190
1191impl From<FactorizedChunk> for ChunkVariant {
1192 fn from(chunk: FactorizedChunk) -> Self {
1193 Self::Factorized(chunk)
1194 }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use grafeo_common::types::{LogicalType, NodeId, Value};
1200
1201 use super::*;
1202
1203 fn make_flat_chunk() -> DataChunk {
1204 let mut col = ValueVector::with_type(LogicalType::Int64);
1205 col.push_int64(1);
1206 col.push_int64(2);
1207 DataChunk::new(vec![col])
1208 }
1209
1210 fn create_multi_level_chunk() -> FactorizedChunk {
1211 let mut sources = ValueVector::with_type(LogicalType::Int64);
1213 sources.push_int64(10);
1214 sources.push_int64(20);
1215
1216 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1217
1218 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1219 neighbors.push_int64(1);
1220 neighbors.push_int64(2);
1221 neighbors.push_int64(3);
1222 neighbors.push_int64(4);
1223
1224 let offsets = vec![0, 2, 4];
1225 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1226 chunk
1227 }
1228
1229 #[test]
1230 fn test_from_flat() {
1231 let flat = make_flat_chunk();
1232 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1233
1234 assert_eq!(factorized.level_count(), 1);
1235 assert_eq!(factorized.logical_row_count(), 2);
1236 assert_eq!(factorized.physical_size(), 2);
1237 }
1238
1239 #[test]
1240 fn test_add_level() {
1241 let mut col0 = ValueVector::with_type(LogicalType::Node);
1243 col0.push_node_id(NodeId::new(100));
1244 col0.push_node_id(NodeId::new(200));
1245
1246 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1247
1248 assert_eq!(chunk.level_count(), 1);
1249 assert_eq!(chunk.logical_row_count(), 2);
1250
1251 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1253 neighbors.push_node_id(NodeId::new(10));
1254 neighbors.push_node_id(NodeId::new(11));
1255 neighbors.push_node_id(NodeId::new(12));
1256 neighbors.push_node_id(NodeId::new(20));
1257 neighbors.push_node_id(NodeId::new(21));
1258
1259 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1261
1262 assert_eq!(chunk.level_count(), 2);
1263 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1266
1267 #[test]
1268 fn test_flatten_single_level() {
1269 let flat = make_flat_chunk();
1270 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1271
1272 let flattened = factorized.flatten();
1273 assert_eq!(flattened.row_count(), 2);
1274 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1275 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1276 }
1277
1278 #[test]
1279 fn test_flatten_multi_level() {
1280 let mut sources = ValueVector::with_type(LogicalType::Int64);
1282 sources.push_int64(1);
1283 sources.push_int64(2);
1284
1285 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1286
1287 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1288 neighbors.push_int64(10);
1289 neighbors.push_int64(11);
1290 neighbors.push_int64(20);
1291 neighbors.push_int64(21);
1292
1293 let offsets = vec![0, 2, 4];
1294 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1295
1296 let flat = chunk.flatten();
1297 assert_eq!(flat.row_count(), 4);
1298 assert_eq!(flat.column_count(), 2);
1299
1300 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1303 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1304 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1305 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1306 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1307 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1308 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1309 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1310 }
1311
1312 #[test]
1313 fn test_logical_row_iter_single_level() {
1314 let flat = make_flat_chunk();
1315 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1316
1317 let indices: Vec<_> = factorized.logical_row_iter().collect();
1318 assert_eq!(indices.len(), 2);
1319 assert_eq!(indices[0], vec![0]);
1320 assert_eq!(indices[1], vec![1]);
1321 }
1322
1323 #[test]
1324 fn test_chunk_variant() {
1325 let flat = make_flat_chunk();
1326 let variant = ChunkVariant::flat(flat.clone());
1327
1328 assert!(variant.is_flat());
1329 assert!(!variant.is_factorized());
1330 assert_eq!(variant.logical_row_count(), 2);
1331
1332 let ensured = variant.ensure_flat();
1333 assert_eq!(ensured.row_count(), 2);
1334 }
1335
1336 #[test]
1337 fn test_chunk_variant_factorized() {
1338 let chunk = create_multi_level_chunk();
1339 let variant = ChunkVariant::factorized(chunk);
1340
1341 assert!(variant.is_factorized());
1342 assert!(!variant.is_flat());
1343 assert_eq!(variant.logical_row_count(), 4);
1344
1345 let flat = variant.ensure_flat();
1346 assert_eq!(flat.row_count(), 4);
1347 }
1348
1349 #[test]
1350 fn test_chunk_variant_from() {
1351 let flat = make_flat_chunk();
1352 let variant: ChunkVariant = flat.into();
1353 assert!(variant.is_flat());
1354
1355 let factorized = create_multi_level_chunk();
1356 let variant2: ChunkVariant = factorized.into();
1357 assert!(variant2.is_factorized());
1358 }
1359
1360 #[test]
1361 fn test_chunk_variant_is_empty() {
1362 let empty_flat = DataChunk::empty();
1363 let variant = ChunkVariant::flat(empty_flat);
1364 assert!(variant.is_empty());
1365
1366 let non_empty = make_flat_chunk();
1367 let variant2 = ChunkVariant::flat(non_empty);
1368 assert!(!variant2.is_empty());
1369 }
1370
1371 #[test]
1372 fn test_empty_chunk() {
1373 let chunk = FactorizedChunk::empty();
1374 assert_eq!(chunk.level_count(), 0);
1375 assert_eq!(chunk.logical_row_count(), 0);
1376 assert_eq!(chunk.physical_size(), 0);
1377
1378 let flat = chunk.flatten();
1379 assert!(flat.is_empty());
1380 }
1381
1382 #[test]
1383 fn test_all_column_names() {
1384 let mut sources = ValueVector::with_type(LogicalType::Int64);
1385 sources.push_int64(1);
1386
1387 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1388
1389 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1390 neighbors.push_int64(10);
1391
1392 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1393
1394 let names = chunk.all_column_names();
1395 assert_eq!(names, vec!["source", "neighbor"]);
1396 }
1397
1398 #[test]
1399 fn test_level_mut() {
1400 let mut chunk = create_multi_level_chunk();
1401
1402 let level = chunk.level_mut(0).unwrap();
1404 assert_eq!(level.column_count(), 1);
1405
1406 assert!(chunk.level_mut(10).is_none());
1408 }
1409
1410 #[test]
1411 fn test_factorization_level_column_mut() {
1412 let mut chunk = create_multi_level_chunk();
1413
1414 let level = chunk.level_mut(0).unwrap();
1415 let col = level.column_mut(0);
1416 assert!(col.is_some());
1417
1418 assert!(level.column_mut(10).is_none());
1420 }
1421
1422 #[test]
1423 fn test_factorization_level_physical_value_count() {
1424 let chunk = create_multi_level_chunk();
1425
1426 let level0 = chunk.level(0).unwrap();
1427 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1430 assert_eq!(level1.physical_value_count(), 4); }
1432
1433 #[test]
1434 fn test_count_rows() {
1435 let chunk = create_multi_level_chunk();
1436 assert_eq!(chunk.count_rows(), 4);
1437
1438 let empty = FactorizedChunk::empty();
1439 assert_eq!(empty.count_rows(), 0);
1440 }
1441
1442 #[test]
1443 fn test_compute_path_multiplicities() {
1444 let chunk = create_multi_level_chunk();
1445
1446 let mults = chunk.compute_path_multiplicities();
1447 assert_eq!(mults.len(), 4);
1449 assert!(mults.iter().all(|&m| m == 1));
1450 }
1451
1452 #[test]
1453 fn test_compute_path_multiplicities_single_level() {
1454 let mut col = ValueVector::with_type(LogicalType::Int64);
1455 col.push_int64(1);
1456 col.push_int64(2);
1457 col.push_int64(3);
1458
1459 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1460 let mults = chunk.compute_path_multiplicities();
1461
1462 assert_eq!(mults.len(), 3);
1464 assert!(mults.iter().all(|&m| m == 1));
1465 }
1466
1467 #[test]
1468 fn test_compute_path_multiplicities_empty() {
1469 let chunk = FactorizedChunk::empty();
1470 let mults = chunk.compute_path_multiplicities();
1471 assert!(mults.is_empty());
1472 }
1473
1474 #[test]
1475 fn test_path_multiplicities_cached() {
1476 let mut chunk = create_multi_level_chunk();
1477
1478 let mults1 = chunk.path_multiplicities_cached();
1480 assert_eq!(mults1.len(), 4);
1481
1482 let mults2 = chunk.path_multiplicities_cached();
1484 assert_eq!(mults1.len(), mults2.len());
1485 }
1486
1487 #[test]
1488 fn test_sum_deepest() {
1489 let chunk = create_multi_level_chunk();
1490
1491 let sum = chunk.sum_deepest(0);
1493 assert_eq!(sum, Some(10.0)); }
1495
1496 #[test]
1497 fn test_sum_deepest_empty() {
1498 let chunk = FactorizedChunk::empty();
1499 assert!(chunk.sum_deepest(0).is_none());
1500 }
1501
1502 #[test]
1503 fn test_sum_deepest_invalid_column() {
1504 let chunk = create_multi_level_chunk();
1505 assert!(chunk.sum_deepest(10).is_none());
1506 }
1507
1508 #[test]
1509 fn test_avg_deepest() {
1510 let chunk = create_multi_level_chunk();
1511
1512 let avg = chunk.avg_deepest(0);
1514 assert_eq!(avg, Some(2.5));
1515 }
1516
1517 #[test]
1518 fn test_avg_deepest_empty() {
1519 let chunk = FactorizedChunk::empty();
1520 assert!(chunk.avg_deepest(0).is_none());
1521 }
1522
1523 #[test]
1524 fn test_min_deepest() {
1525 let chunk = create_multi_level_chunk();
1526
1527 let min = chunk.min_deepest(0);
1528 assert_eq!(min, Some(Value::Int64(1)));
1529 }
1530
1531 #[test]
1532 fn test_min_deepest_empty() {
1533 let chunk = FactorizedChunk::empty();
1534 assert!(chunk.min_deepest(0).is_none());
1535 }
1536
1537 #[test]
1538 fn test_min_deepest_invalid_column() {
1539 let chunk = create_multi_level_chunk();
1540 assert!(chunk.min_deepest(10).is_none());
1541 }
1542
1543 #[test]
1544 fn test_max_deepest() {
1545 let chunk = create_multi_level_chunk();
1546
1547 let max = chunk.max_deepest(0);
1548 assert_eq!(max, Some(Value::Int64(4)));
1549 }
1550
1551 #[test]
1552 fn test_max_deepest_empty() {
1553 let chunk = FactorizedChunk::empty();
1554 assert!(chunk.max_deepest(0).is_none());
1555 }
1556
1557 #[test]
1558 fn test_value_less_than() {
1559 assert!(FactorizedChunk::value_less_than(
1561 &Value::Null,
1562 &Value::Int64(1)
1563 ));
1564 assert!(!FactorizedChunk::value_less_than(
1565 &Value::Int64(1),
1566 &Value::Null
1567 ));
1568 assert!(!FactorizedChunk::value_less_than(
1569 &Value::Null,
1570 &Value::Null
1571 ));
1572
1573 assert!(FactorizedChunk::value_less_than(
1575 &Value::Int64(1),
1576 &Value::Int64(2)
1577 ));
1578 assert!(!FactorizedChunk::value_less_than(
1579 &Value::Int64(2),
1580 &Value::Int64(1)
1581 ));
1582
1583 assert!(FactorizedChunk::value_less_than(
1585 &Value::Float64(1.5),
1586 &Value::Float64(2.5)
1587 ));
1588
1589 assert!(FactorizedChunk::value_less_than(
1591 &Value::Int64(1),
1592 &Value::Float64(1.5)
1593 ));
1594 assert!(FactorizedChunk::value_less_than(
1595 &Value::Float64(0.5),
1596 &Value::Int64(1)
1597 ));
1598
1599 assert!(FactorizedChunk::value_less_than(
1601 &Value::String("apple".into()),
1602 &Value::String("banana".into())
1603 ));
1604
1605 assert!(FactorizedChunk::value_less_than(
1607 &Value::Bool(false),
1608 &Value::Bool(true)
1609 ));
1610 assert!(!FactorizedChunk::value_less_than(
1611 &Value::Bool(true),
1612 &Value::Bool(false)
1613 ));
1614
1615 assert!(!FactorizedChunk::value_less_than(
1617 &Value::Int64(1),
1618 &Value::String("hello".into())
1619 ));
1620 }
1621
1622 #[test]
1623 fn test_filter_deepest() {
1624 let chunk = create_multi_level_chunk();
1625
1626 let filtered = chunk.filter_deepest(0, |v| {
1628 if let Value::Int64(n) = v {
1629 *n > 2
1630 } else {
1631 false
1632 }
1633 });
1634
1635 let filtered = filtered.unwrap();
1636 assert_eq!(filtered.logical_row_count(), 2); }
1638
1639 #[test]
1640 fn test_filter_deepest_empty() {
1641 let chunk = FactorizedChunk::empty();
1642 assert!(chunk.filter_deepest(0, |_| true).is_none());
1643 }
1644
1645 #[test]
1646 fn test_filter_deepest_all_filtered() {
1647 let chunk = create_multi_level_chunk();
1648
1649 let filtered = chunk.filter_deepest(0, |_| false);
1651
1652 let filtered = filtered.unwrap();
1653 assert_eq!(filtered.logical_row_count(), 0);
1654 }
1655
1656 #[test]
1657 fn test_filter_deepest_invalid_column() {
1658 let chunk = create_multi_level_chunk();
1659 assert!(chunk.filter_deepest(10, |_| true).is_none());
1660 }
1661
1662 #[test]
1663 fn test_filter_deepest_multi() {
1664 let mut sources = ValueVector::with_type(LogicalType::Int64);
1666 sources.push_int64(1);
1667
1668 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1669
1670 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1671 col1.push_int64(10);
1672 col1.push_int64(20);
1673 col1.push_int64(30);
1674
1675 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1676 col2.push_int64(1);
1677 col2.push_int64(2);
1678 col2.push_int64(3);
1679
1680 let offsets = vec![0, 3];
1681 chunk.add_level(
1682 vec![col1, col2],
1683 vec!["a".to_string(), "b".to_string()],
1684 &offsets,
1685 );
1686
1687 let filtered = chunk.filter_deepest_multi(|values| {
1689 if values.len() == 2
1690 && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1691 {
1692 return *a + *b > 15;
1693 }
1694 false
1695 });
1696
1697 assert!(filtered.is_some());
1698 let filtered = filtered.unwrap();
1699 assert_eq!(filtered.logical_row_count(), 2); }
1701
1702 #[test]
1703 fn test_filter_deepest_multi_empty() {
1704 let chunk = FactorizedChunk::empty();
1705 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1706 }
1707
1708 #[test]
1709 fn test_filter_deepest_multi_no_columns() {
1710 let mut sources = ValueVector::with_type(LogicalType::Int64);
1712 sources.push_int64(1);
1713
1714 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1715
1716 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1718 chunk.add_factorized_level(empty_level);
1719
1720 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1721 }
1722
1723 #[test]
1724 fn test_project() {
1725 let mut sources = ValueVector::with_type(LogicalType::Int64);
1726 sources.push_int64(1);
1727 sources.push_int64(2);
1728
1729 let mut col2 = ValueVector::with_type(LogicalType::String);
1730 col2.push_string("a");
1731 col2.push_string("b");
1732
1733 let chunk = FactorizedChunk::with_flat_level(
1734 vec![sources, col2],
1735 vec!["num".to_string(), "str".to_string()],
1736 );
1737
1738 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1740
1741 assert_eq!(projected.total_column_count(), 1);
1742 let names = projected.all_column_names();
1743 assert_eq!(names, vec!["projected_num"]);
1744 }
1745
1746 #[test]
1747 fn test_project_empty() {
1748 let chunk = FactorizedChunk::empty();
1749 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1750 assert_eq!(projected.level_count(), 0);
1751 }
1752
1753 #[test]
1754 fn test_project_empty_specs() {
1755 let chunk = create_multi_level_chunk();
1756 let projected = chunk.project(&[]);
1757 assert_eq!(projected.level_count(), 0);
1758 }
1759
1760 #[test]
1761 fn test_project_invalid_level() {
1762 let chunk = create_multi_level_chunk();
1763
1764 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1766 assert_eq!(projected.level_count(), 0);
1767 }
1768
1769 #[test]
1770 fn test_project_multi_level() {
1771 let chunk = create_multi_level_chunk();
1772
1773 let projected =
1775 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1776
1777 assert_eq!(projected.level_count(), 2);
1778 assert_eq!(projected.total_column_count(), 2);
1779 }
1780
1781 #[test]
1782 fn test_total_column_count() {
1783 let chunk = create_multi_level_chunk();
1784 assert_eq!(chunk.total_column_count(), 2); }
1786
1787 #[test]
1788 fn test_chunk_state_access() {
1789 let mut chunk = create_multi_level_chunk();
1790
1791 let state = chunk.chunk_state();
1792 assert!(state.is_factorized());
1793
1794 let state_mut = chunk.chunk_state_mut();
1795 state_mut.invalidate_cache();
1796 }
1797
1798 #[test]
1799 fn test_logical_row_iter_multi_level() {
1800 let chunk = create_multi_level_chunk();
1801
1802 let indices: Vec<_> = chunk.logical_row_iter().collect();
1803 assert_eq!(indices.len(), 4);
1804
1805 assert_eq!(indices[0], vec![0, 0]);
1807 assert_eq!(indices[1], vec![0, 1]);
1808 assert_eq!(indices[2], vec![1, 2]);
1809 assert_eq!(indices[3], vec![1, 3]);
1810 }
1811
1812 #[test]
1813 fn test_sum_deepest_with_float() {
1814 let mut sources = ValueVector::with_type(LogicalType::Int64);
1815 sources.push_int64(1);
1816
1817 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1818
1819 let mut floats = ValueVector::with_type(LogicalType::Float64);
1820 floats.push_float64(1.5);
1821 floats.push_float64(2.5);
1822 floats.push_float64(3.0);
1823
1824 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1825
1826 let sum = chunk.sum_deepest(0);
1827 assert_eq!(sum, Some(7.0)); }
1829
1830 #[test]
1831 fn test_min_max_with_strings() {
1832 let mut sources = ValueVector::with_type(LogicalType::Int64);
1833 sources.push_int64(1);
1834
1835 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1836
1837 let mut strings = ValueVector::with_type(LogicalType::String);
1838 strings.push_string("banana");
1839 strings.push_string("apple");
1840 strings.push_string("cherry");
1841
1842 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1843
1844 let min = chunk.min_deepest(0);
1845 assert_eq!(min, Some(Value::String("apple".into())));
1846
1847 let max = chunk.max_deepest(0);
1848 assert_eq!(max, Some(Value::String("cherry".into())));
1849 }
1850
1851 #[test]
1852 fn test_recompute_logical_row_count_empty() {
1853 let mut chunk = FactorizedChunk::empty();
1854 chunk.recompute_logical_row_count();
1855 assert_eq!(chunk.logical_row_count(), 0);
1856 }
1857
1858 #[test]
1859 fn test_factorization_level_group_count() {
1860 let chunk = create_multi_level_chunk();
1861
1862 let level0 = chunk.level(0).unwrap();
1863 assert_eq!(level0.group_count(), 2);
1864
1865 let level1 = chunk.level(1).unwrap();
1866 assert_eq!(level1.group_count(), 4);
1867 }
1868
1869 #[test]
1870 fn test_factorization_level_multiplicities() {
1871 let chunk = create_multi_level_chunk();
1872
1873 let level1 = chunk.level(1).unwrap();
1874 let mults = level1.multiplicities();
1875 assert_eq!(mults, &[2, 2]); }
1877
1878 #[test]
1879 fn test_factorization_level_column_names() {
1880 let chunk = create_multi_level_chunk();
1881
1882 let level0 = chunk.level(0).unwrap();
1883 assert_eq!(level0.column_names(), &["src"]);
1884
1885 let level1 = chunk.level(1).unwrap();
1886 assert_eq!(level1.column_names(), &["nbr"]);
1887 }
1888}