1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
242 if let Some(cached) = self.state.cached_multiplicities() {
244 return Arc::clone(cached);
245 }
246
247 let mults = self.compute_path_multiplicities();
249 let arc_mults: Arc<[usize]> = mults.into();
250 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
251 arc_mults
252 }
253
254 #[must_use]
256 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
257 self.levels.get(index)
258 }
259
260 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
262 self.levels.get_mut(index)
263 }
264
265 pub fn add_level(
276 &mut self,
277 columns: Vec<ValueVector>,
278 column_names: Vec<String>,
279 offsets: &[u32],
280 ) {
281 let parent_count = offsets.len().saturating_sub(1);
282
283 let multiplicities: Vec<usize> = (0..parent_count)
285 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
286 .collect();
287
288 let factorized_columns: Vec<FactorizedVector> = columns
290 .into_iter()
291 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
292 .collect();
293
294 let level =
295 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
296 self.levels.push(level);
297
298 if self.levels.len() == 1 {
302 self.logical_row_count = multiplicities.iter().sum();
304 } else {
305 self.recompute_logical_row_count();
307 }
308
309 self.update_state();
311 }
312
313 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
315 self.levels.push(level);
316 self.recompute_logical_row_count();
317 self.update_state();
318 }
319
320 fn update_state(&mut self) {
322 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
323 }
324
325 fn recompute_logical_row_count(&mut self) {
327 if self.levels.is_empty() {
328 self.logical_row_count = 0;
329 return;
330 }
331
332 let level0_count = self.levels[0].group_count;
334 if self.levels.len() == 1 {
335 self.logical_row_count = level0_count;
336 return;
337 }
338
339 let mut counts = vec![1usize; level0_count];
342
343 for level_idx in 1..self.levels.len() {
344 let level = &self.levels[level_idx];
345 let mut new_counts = Vec::with_capacity(counts.len() * 2); for (parent_idx, &parent_count) in counts.iter().enumerate() {
348 if parent_idx < level.multiplicities.len() {
350 let child_mult = level.multiplicities[parent_idx];
351 for _ in 0..child_mult {
352 new_counts.push(parent_count);
353 }
354 }
355 }
356
357 counts = new_counts;
358 }
359
360 self.logical_row_count = counts.len();
361 }
362
363 #[must_use]
367 pub fn flatten(&self) -> DataChunk {
368 if self.levels.is_empty() {
369 return DataChunk::empty();
370 }
371
372 let mut all_columns: Vec<ValueVector> = Vec::new();
374
375 if self.levels.len() == 1 {
377 let level = &self.levels[0];
378 for col in &level.columns {
379 all_columns.push(col.flatten(None));
380 }
381 return DataChunk::new(all_columns);
382 }
383
384 let row_iter = self.logical_row_iter();
387 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
388
389 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
391 for level in &self.levels {
392 for col in &level.columns {
393 output_columns.push(ValueVector::with_capacity(
394 col.data_type(),
395 self.logical_row_count,
396 ));
397 }
398 }
399
400 for indices in row_iter {
402 let mut col_offset = 0;
403 for (level_idx, level) in self.levels.iter().enumerate() {
404 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
405 for (col_idx, col) in level.columns.iter().enumerate() {
406 if let Some(value) = col.get_physical(level_idx_value) {
407 output_columns[col_offset + col_idx].push_value(value);
408 }
409 }
410 col_offset += level.column_count();
411 }
412 }
413
414 DataChunk::new(output_columns)
415 }
416
417 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
421 FactorizedRowIterator::new(self)
422 }
423
424 #[must_use]
426 pub fn total_column_count(&self) -> usize {
427 self.levels.iter().map(|l| l.column_count()).sum()
428 }
429
430 #[must_use]
432 pub fn all_column_names(&self) -> Vec<String> {
433 self.levels
434 .iter()
435 .flat_map(|l| l.column_names.iter().cloned())
436 .collect()
437 }
438
439 #[must_use]
453 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
454 where
455 F: Fn(&grafeo_common::types::Value) -> bool,
456 {
457 if self.levels.is_empty() {
458 return None;
459 }
460
461 let deepest_idx = self.levels.len() - 1;
462 let deepest = &self.levels[deepest_idx];
463
464 let filter_col = deepest.column(column_idx)?;
466
467 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
469 .map(|i| {
470 ValueVector::with_type(
471 deepest
472 .column(i)
473 .expect("column exists: i < column_count")
474 .data_type(),
475 )
476 })
477 .collect();
478
479 let parent_count = filter_col.parent_count();
481 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
482 let mut new_offsets: Vec<u32> = vec![0];
483
484 for parent_idx in 0..parent_count {
486 let (start, end) = filter_col.range_for_parent(parent_idx);
487
488 for phys_idx in start..end {
489 if let Some(value) = filter_col.get_physical(phys_idx)
491 && predicate(&value)
492 {
493 for col_idx in 0..deepest.column_count() {
495 if let Some(col) = deepest.column(col_idx)
496 && let Some(v) = col.get_physical(phys_idx)
497 {
498 new_columns[col_idx].push_value(v);
499 }
500 }
501 new_multiplicities[parent_idx] += 1;
502 }
503 }
504
505 new_offsets.push(new_columns[0].len() as u32);
506 }
507
508 let total_remaining: usize = new_multiplicities.iter().sum();
510 if total_remaining == 0 {
511 return Some(Self::empty());
512 }
513
514 let new_factorized_cols: Vec<FactorizedVector> = new_columns
516 .into_iter()
517 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
518 .collect();
519
520 let new_level = FactorizationLevel::unflat(
521 new_factorized_cols,
522 deepest.column_names().to_vec(),
523 new_multiplicities,
524 );
525
526 let mut result = Self {
528 levels: self.levels[..deepest_idx].to_vec(),
529 logical_row_count: 0,
530 state: ChunkState::flat(0),
531 };
532 result.levels.push(new_level);
533 result.recompute_logical_row_count();
534 result.update_state();
535
536 Some(result)
537 }
538
539 #[must_use]
547 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
548 where
549 F: Fn(&[grafeo_common::types::Value]) -> bool,
550 {
551 if self.levels.is_empty() {
552 return None;
553 }
554
555 let deepest_idx = self.levels.len() - 1;
556 let deepest = &self.levels[deepest_idx];
557 let col_count = deepest.column_count();
558
559 if col_count == 0 {
560 return None;
561 }
562
563 let first_col = deepest.column(0)?;
564 let parent_count = first_col.parent_count();
565
566 let mut new_columns: Vec<ValueVector> = (0..col_count)
568 .map(|i| {
569 ValueVector::with_type(
570 deepest
571 .column(i)
572 .expect("column exists: i < column_count")
573 .data_type(),
574 )
575 })
576 .collect();
577
578 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
579 let mut new_offsets: Vec<u32> = vec![0];
580 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
581
582 for parent_idx in 0..parent_count {
583 let (start, end) = first_col.range_for_parent(parent_idx);
584
585 for phys_idx in start..end {
586 row_values.clear();
588 for col_idx in 0..col_count {
589 if let Some(col) = deepest.column(col_idx)
590 && let Some(v) = col.get_physical(phys_idx)
591 {
592 row_values.push(v);
593 }
594 }
595
596 if predicate(&row_values) {
598 for (col_idx, v) in row_values.iter().enumerate() {
599 new_columns[col_idx].push_value(v.clone());
600 }
601 new_multiplicities[parent_idx] += 1;
602 }
603 }
604
605 new_offsets.push(new_columns[0].len() as u32);
606 }
607
608 let total: usize = new_multiplicities.iter().sum();
610 if total == 0 {
611 return Some(Self::empty());
612 }
613
614 let new_factorized_cols: Vec<FactorizedVector> = new_columns
616 .into_iter()
617 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
618 .collect();
619
620 let new_level = FactorizationLevel::unflat(
621 new_factorized_cols,
622 deepest.column_names().to_vec(),
623 new_multiplicities,
624 );
625
626 let mut result = Self {
627 levels: self.levels[..deepest_idx].to_vec(),
628 logical_row_count: 0,
629 state: ChunkState::flat(0),
630 };
631 result.levels.push(new_level);
632 result.recompute_logical_row_count();
633 result.update_state();
634
635 Some(result)
636 }
637
638 #[must_use]
658 pub fn count_rows(&self) -> usize {
659 self.logical_row_count()
660 }
661
662 #[must_use]
679 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
680 if self.levels.is_empty() {
681 return Vec::new();
682 }
683
684 if self.levels.len() == 1 {
686 return vec![1; self.levels[0].group_count];
687 }
688
689 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
691
692 for level_idx in 1..self.levels.len() {
694 let level = &self.levels[level_idx];
695 let mut child_multiplicities = Vec::with_capacity(level.group_count);
696
697 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
699 let child_count = if parent_idx < level.multiplicities.len() {
700 level.multiplicities[parent_idx]
701 } else {
702 0
703 };
704
705 for _ in 0..child_count {
707 child_multiplicities.push(parent_mult);
708 }
709 }
710
711 parent_multiplicities = child_multiplicities;
712 }
713
714 parent_multiplicities
715 }
716
717 #[must_use]
730 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
731 if self.levels.is_empty() {
732 return None;
733 }
734
735 let deepest_idx = self.levels.len() - 1;
736 let deepest = &self.levels[deepest_idx];
737 let col = deepest.column(column_idx)?;
738
739 let multiplicities = self.compute_path_multiplicities();
741
742 let mut sum = 0.0;
743 for (phys_idx, mult) in multiplicities.iter().enumerate() {
744 if let Some(value) = col.get_physical(phys_idx) {
745 let num_value = match &value {
747 grafeo_common::types::Value::Int64(v) => *v as f64,
748 grafeo_common::types::Value::Float64(v) => *v,
749 _ => continue, };
751 sum += num_value * (*mult as f64);
752 }
753 }
754 Some(sum)
755 }
756
757 #[must_use]
769 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
770 let count = self.logical_row_count();
771 if count == 0 {
772 return None;
773 }
774
775 let sum = self.sum_deepest(column_idx)?;
776 Some(sum / count as f64)
777 }
778
779 #[must_use]
792 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
793 if self.levels.is_empty() {
794 return None;
795 }
796
797 let deepest_idx = self.levels.len() - 1;
798 let deepest = &self.levels[deepest_idx];
799 let col = deepest.column(column_idx)?;
800
801 let mut min_value: Option<grafeo_common::types::Value> = None;
802
803 for phys_idx in 0..col.physical_len() {
804 if let Some(value) = col.get_physical(phys_idx) {
805 min_value = Some(match min_value {
806 None => value,
807 Some(current) => {
808 if Self::value_less_than(&value, ¤t) {
809 value
810 } else {
811 current
812 }
813 }
814 });
815 }
816 }
817
818 min_value
819 }
820
821 #[must_use]
834 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
835 if self.levels.is_empty() {
836 return None;
837 }
838
839 let deepest_idx = self.levels.len() - 1;
840 let deepest = &self.levels[deepest_idx];
841 let col = deepest.column(column_idx)?;
842
843 let mut max_value: Option<grafeo_common::types::Value> = None;
844
845 for phys_idx in 0..col.physical_len() {
846 if let Some(value) = col.get_physical(phys_idx) {
847 max_value = Some(match max_value {
848 None => value,
849 Some(current) => {
850 if Self::value_less_than(¤t, &value) {
851 value
852 } else {
853 current
854 }
855 }
856 });
857 }
858 }
859
860 max_value
861 }
862
863 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
871 use grafeo_common::types::Value;
872
873 match (a, b) {
874 (Value::Null, Value::Null) => false,
876 (Value::Null, _) => true,
877 (_, Value::Null) => false,
878
879 (Value::Int64(x), Value::Int64(y)) => x < y,
881 (Value::Float64(x), Value::Float64(y)) => x < y,
882 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
883 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
884
885 (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
887
888 (Value::Bool(x), Value::Bool(y)) => !x && *y,
890
891 _ => false,
894 }
895 }
896
897 #[must_use]
911 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
912 if self.levels.is_empty() || column_specs.is_empty() {
913 return Self::empty();
914 }
915
916 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
918 for (level_idx, col_idx, name) in column_specs {
919 if *level_idx < self.levels.len() {
920 level_specs[*level_idx].push((*col_idx, name.clone()));
921 }
922 }
923
924 let mut new_levels = Vec::new();
926
927 for (level_idx, specs) in level_specs.iter().enumerate() {
928 if specs.is_empty() {
929 continue;
930 }
931
932 let src_level = &self.levels[level_idx];
933
934 let columns: Vec<FactorizedVector> = specs
935 .iter()
936 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
937 .collect();
938
939 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
940
941 if level_idx == 0 {
942 new_levels.push(FactorizationLevel::flat(columns, names));
943 } else {
944 let mults = src_level.multiplicities().to_vec();
945 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
946 }
947 }
948
949 if new_levels.is_empty() {
950 return Self::empty();
951 }
952
953 let mut result = Self {
954 levels: new_levels,
955 logical_row_count: 0,
956 state: ChunkState::flat(0),
957 };
958 result.recompute_logical_row_count();
959 result.update_state();
960 result
961 }
962}
963
964pub struct FactorizedRowIterator<'a> {
979 chunk: &'a FactorizedChunk,
980 indices: Vec<usize>,
982 exhausted: bool,
985}
986
987impl<'a> FactorizedRowIterator<'a> {
988 fn new(chunk: &'a FactorizedChunk) -> Self {
989 let indices = vec![0; chunk.level_count()];
990 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
991
992 let mut iter = Self {
993 chunk,
994 indices,
995 exhausted,
996 };
997
998 if !exhausted && !iter.has_valid_deepest_range() {
1000 if !iter.advance() {
1001 exhausted = true;
1002 }
1003 iter.exhausted = exhausted;
1004 }
1005
1006 iter
1007 }
1008
1009 fn advance(&mut self) -> bool {
1011 if self.exhausted || self.chunk.levels.is_empty() {
1012 return false;
1013 }
1014
1015 for level_idx in (0..self.chunk.levels.len()).rev() {
1017 let level = &self.chunk.levels[level_idx];
1018
1019 let parent_idx = if level_idx == 0 {
1021 self.indices[0] + 1
1023 } else {
1024 self.indices[level_idx - 1]
1026 };
1027
1028 let (_start, end) = if level_idx == 0 {
1030 (0, level.group_count)
1031 } else {
1032 if let Some(col) = level.columns.first() {
1034 col.range_for_parent(parent_idx)
1035 } else {
1036 (0, 0)
1037 }
1038 };
1039
1040 let current = self.indices[level_idx];
1041 if current + 1 < end {
1042 self.indices[level_idx] = current + 1;
1044 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1046 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1047 let (deeper_start, _) =
1048 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1049 self.indices[deeper_idx] = deeper_start;
1050 }
1051 }
1052
1053 if self.has_valid_deepest_range() {
1056 return true;
1057 }
1058 return self.advance();
1061 }
1062 }
1064
1065 self.exhausted = true;
1067 false
1068 }
1069
1070 fn has_valid_deepest_range(&self) -> bool {
1076 if self.chunk.levels.len() <= 1 {
1077 return true; }
1079
1080 for level_idx in 1..self.chunk.levels.len() {
1082 let parent_idx = self.indices[level_idx - 1];
1083 if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1084 let (start, end) = col.range_for_parent(parent_idx);
1085 if start >= end {
1086 return false;
1087 }
1088 } else {
1089 return false;
1090 }
1091 }
1092
1093 true
1094 }
1095}
1096
1097impl Iterator for FactorizedRowIterator<'_> {
1098 type Item = Vec<usize>;
1099
1100 fn next(&mut self) -> Option<Self::Item> {
1101 if self.exhausted {
1102 return None;
1103 }
1104
1105 let result = self.indices.clone();
1107 self.advance();
1108 Some(result)
1109 }
1110}
1111
1112#[derive(Debug, Clone)]
1114pub enum ChunkVariant {
1115 Flat(DataChunk),
1117 Factorized(FactorizedChunk),
1119}
1120
1121impl ChunkVariant {
1122 #[must_use]
1124 pub fn flat(chunk: DataChunk) -> Self {
1125 Self::Flat(chunk)
1126 }
1127
1128 #[must_use]
1130 pub fn factorized(chunk: FactorizedChunk) -> Self {
1131 Self::Factorized(chunk)
1132 }
1133
1134 #[must_use]
1136 pub fn ensure_flat(self) -> DataChunk {
1137 match self {
1138 Self::Flat(chunk) => chunk,
1139 Self::Factorized(chunk) => chunk.flatten(),
1140 }
1141 }
1142
1143 #[must_use]
1145 pub fn logical_row_count(&self) -> usize {
1146 match self {
1147 Self::Flat(chunk) => chunk.row_count(),
1148 Self::Factorized(chunk) => chunk.logical_row_count(),
1149 }
1150 }
1151
1152 #[must_use]
1154 pub fn is_factorized(&self) -> bool {
1155 matches!(self, Self::Factorized(_))
1156 }
1157
1158 #[must_use]
1160 pub fn is_flat(&self) -> bool {
1161 matches!(self, Self::Flat(_))
1162 }
1163
1164 #[must_use]
1166 pub fn is_empty(&self) -> bool {
1167 self.logical_row_count() == 0
1168 }
1169}
1170
1171impl From<DataChunk> for ChunkVariant {
1172 fn from(chunk: DataChunk) -> Self {
1173 Self::Flat(chunk)
1174 }
1175}
1176
1177impl From<FactorizedChunk> for ChunkVariant {
1178 fn from(chunk: FactorizedChunk) -> Self {
1179 Self::Factorized(chunk)
1180 }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use grafeo_common::types::{LogicalType, NodeId, Value};
1186
1187 use super::*;
1188
1189 fn make_flat_chunk() -> DataChunk {
1190 let mut col = ValueVector::with_type(LogicalType::Int64);
1191 col.push_int64(1);
1192 col.push_int64(2);
1193 DataChunk::new(vec![col])
1194 }
1195
1196 fn create_multi_level_chunk() -> FactorizedChunk {
1197 let mut sources = ValueVector::with_type(LogicalType::Int64);
1199 sources.push_int64(10);
1200 sources.push_int64(20);
1201
1202 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1203
1204 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1205 neighbors.push_int64(1);
1206 neighbors.push_int64(2);
1207 neighbors.push_int64(3);
1208 neighbors.push_int64(4);
1209
1210 let offsets = vec![0, 2, 4];
1211 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1212 chunk
1213 }
1214
1215 #[test]
1216 fn test_from_flat() {
1217 let flat = make_flat_chunk();
1218 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1219
1220 assert_eq!(factorized.level_count(), 1);
1221 assert_eq!(factorized.logical_row_count(), 2);
1222 assert_eq!(factorized.physical_size(), 2);
1223 }
1224
1225 #[test]
1226 fn test_add_level() {
1227 let mut col0 = ValueVector::with_type(LogicalType::Node);
1229 col0.push_node_id(NodeId::new(100));
1230 col0.push_node_id(NodeId::new(200));
1231
1232 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1233
1234 assert_eq!(chunk.level_count(), 1);
1235 assert_eq!(chunk.logical_row_count(), 2);
1236
1237 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1239 neighbors.push_node_id(NodeId::new(10));
1240 neighbors.push_node_id(NodeId::new(11));
1241 neighbors.push_node_id(NodeId::new(12));
1242 neighbors.push_node_id(NodeId::new(20));
1243 neighbors.push_node_id(NodeId::new(21));
1244
1245 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1247
1248 assert_eq!(chunk.level_count(), 2);
1249 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1252
1253 #[test]
1254 fn test_flatten_single_level() {
1255 let flat = make_flat_chunk();
1256 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1257
1258 let flattened = factorized.flatten();
1259 assert_eq!(flattened.row_count(), 2);
1260 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1261 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1262 }
1263
1264 #[test]
1265 fn test_flatten_multi_level() {
1266 let mut sources = ValueVector::with_type(LogicalType::Int64);
1268 sources.push_int64(1);
1269 sources.push_int64(2);
1270
1271 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1272
1273 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1274 neighbors.push_int64(10);
1275 neighbors.push_int64(11);
1276 neighbors.push_int64(20);
1277 neighbors.push_int64(21);
1278
1279 let offsets = vec![0, 2, 4];
1280 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1281
1282 let flat = chunk.flatten();
1283 assert_eq!(flat.row_count(), 4);
1284 assert_eq!(flat.column_count(), 2);
1285
1286 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1289 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1290 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1291 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1292 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1293 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1294 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1295 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1296 }
1297
1298 #[test]
1299 fn test_logical_row_iter_single_level() {
1300 let flat = make_flat_chunk();
1301 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1302
1303 let indices: Vec<_> = factorized.logical_row_iter().collect();
1304 assert_eq!(indices.len(), 2);
1305 assert_eq!(indices[0], vec![0]);
1306 assert_eq!(indices[1], vec![1]);
1307 }
1308
1309 #[test]
1310 fn test_chunk_variant() {
1311 let flat = make_flat_chunk();
1312 let variant = ChunkVariant::flat(flat.clone());
1313
1314 assert!(variant.is_flat());
1315 assert!(!variant.is_factorized());
1316 assert_eq!(variant.logical_row_count(), 2);
1317
1318 let ensured = variant.ensure_flat();
1319 assert_eq!(ensured.row_count(), 2);
1320 }
1321
1322 #[test]
1323 fn test_chunk_variant_factorized() {
1324 let chunk = create_multi_level_chunk();
1325 let variant = ChunkVariant::factorized(chunk);
1326
1327 assert!(variant.is_factorized());
1328 assert!(!variant.is_flat());
1329 assert_eq!(variant.logical_row_count(), 4);
1330
1331 let flat = variant.ensure_flat();
1332 assert_eq!(flat.row_count(), 4);
1333 }
1334
1335 #[test]
1336 fn test_chunk_variant_from() {
1337 let flat = make_flat_chunk();
1338 let variant: ChunkVariant = flat.into();
1339 assert!(variant.is_flat());
1340
1341 let factorized = create_multi_level_chunk();
1342 let variant2: ChunkVariant = factorized.into();
1343 assert!(variant2.is_factorized());
1344 }
1345
1346 #[test]
1347 fn test_chunk_variant_is_empty() {
1348 let empty_flat = DataChunk::empty();
1349 let variant = ChunkVariant::flat(empty_flat);
1350 assert!(variant.is_empty());
1351
1352 let non_empty = make_flat_chunk();
1353 let variant2 = ChunkVariant::flat(non_empty);
1354 assert!(!variant2.is_empty());
1355 }
1356
1357 #[test]
1358 fn test_empty_chunk() {
1359 let chunk = FactorizedChunk::empty();
1360 assert_eq!(chunk.level_count(), 0);
1361 assert_eq!(chunk.logical_row_count(), 0);
1362 assert_eq!(chunk.physical_size(), 0);
1363
1364 let flat = chunk.flatten();
1365 assert!(flat.is_empty());
1366 }
1367
1368 #[test]
1369 fn test_all_column_names() {
1370 let mut sources = ValueVector::with_type(LogicalType::Int64);
1371 sources.push_int64(1);
1372
1373 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1374
1375 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1376 neighbors.push_int64(10);
1377
1378 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1379
1380 let names = chunk.all_column_names();
1381 assert_eq!(names, vec!["source", "neighbor"]);
1382 }
1383
1384 #[test]
1385 fn test_level_mut() {
1386 let mut chunk = create_multi_level_chunk();
1387
1388 let level = chunk.level_mut(0).unwrap();
1390 assert_eq!(level.column_count(), 1);
1391
1392 assert!(chunk.level_mut(10).is_none());
1394 }
1395
1396 #[test]
1397 fn test_factorization_level_column_mut() {
1398 let mut chunk = create_multi_level_chunk();
1399
1400 let level = chunk.level_mut(0).unwrap();
1401 let col = level.column_mut(0);
1402 assert!(col.is_some());
1403
1404 assert!(level.column_mut(10).is_none());
1406 }
1407
1408 #[test]
1409 fn test_factorization_level_physical_value_count() {
1410 let chunk = create_multi_level_chunk();
1411
1412 let level0 = chunk.level(0).unwrap();
1413 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1416 assert_eq!(level1.physical_value_count(), 4); }
1418
1419 #[test]
1420 fn test_count_rows() {
1421 let chunk = create_multi_level_chunk();
1422 assert_eq!(chunk.count_rows(), 4);
1423
1424 let empty = FactorizedChunk::empty();
1425 assert_eq!(empty.count_rows(), 0);
1426 }
1427
1428 #[test]
1429 fn test_compute_path_multiplicities() {
1430 let chunk = create_multi_level_chunk();
1431
1432 let mults = chunk.compute_path_multiplicities();
1433 assert_eq!(mults.len(), 4);
1435 assert!(mults.iter().all(|&m| m == 1));
1436 }
1437
1438 #[test]
1439 fn test_compute_path_multiplicities_single_level() {
1440 let mut col = ValueVector::with_type(LogicalType::Int64);
1441 col.push_int64(1);
1442 col.push_int64(2);
1443 col.push_int64(3);
1444
1445 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1446 let mults = chunk.compute_path_multiplicities();
1447
1448 assert_eq!(mults.len(), 3);
1450 assert!(mults.iter().all(|&m| m == 1));
1451 }
1452
1453 #[test]
1454 fn test_compute_path_multiplicities_empty() {
1455 let chunk = FactorizedChunk::empty();
1456 let mults = chunk.compute_path_multiplicities();
1457 assert!(mults.is_empty());
1458 }
1459
1460 #[test]
1461 fn test_path_multiplicities_cached() {
1462 let mut chunk = create_multi_level_chunk();
1463
1464 let mults1 = chunk.path_multiplicities_cached();
1466 assert_eq!(mults1.len(), 4);
1467
1468 let mults2 = chunk.path_multiplicities_cached();
1470 assert_eq!(mults1.len(), mults2.len());
1471 }
1472
1473 #[test]
1474 fn test_sum_deepest() {
1475 let chunk = create_multi_level_chunk();
1476
1477 let sum = chunk.sum_deepest(0);
1479 assert_eq!(sum, Some(10.0)); }
1481
1482 #[test]
1483 fn test_sum_deepest_empty() {
1484 let chunk = FactorizedChunk::empty();
1485 assert!(chunk.sum_deepest(0).is_none());
1486 }
1487
1488 #[test]
1489 fn test_sum_deepest_invalid_column() {
1490 let chunk = create_multi_level_chunk();
1491 assert!(chunk.sum_deepest(10).is_none());
1492 }
1493
1494 #[test]
1495 fn test_avg_deepest() {
1496 let chunk = create_multi_level_chunk();
1497
1498 let avg = chunk.avg_deepest(0);
1500 assert_eq!(avg, Some(2.5));
1501 }
1502
1503 #[test]
1504 fn test_avg_deepest_empty() {
1505 let chunk = FactorizedChunk::empty();
1506 assert!(chunk.avg_deepest(0).is_none());
1507 }
1508
1509 #[test]
1510 fn test_min_deepest() {
1511 let chunk = create_multi_level_chunk();
1512
1513 let min = chunk.min_deepest(0);
1514 assert_eq!(min, Some(Value::Int64(1)));
1515 }
1516
1517 #[test]
1518 fn test_min_deepest_empty() {
1519 let chunk = FactorizedChunk::empty();
1520 assert!(chunk.min_deepest(0).is_none());
1521 }
1522
1523 #[test]
1524 fn test_min_deepest_invalid_column() {
1525 let chunk = create_multi_level_chunk();
1526 assert!(chunk.min_deepest(10).is_none());
1527 }
1528
1529 #[test]
1530 fn test_max_deepest() {
1531 let chunk = create_multi_level_chunk();
1532
1533 let max = chunk.max_deepest(0);
1534 assert_eq!(max, Some(Value::Int64(4)));
1535 }
1536
1537 #[test]
1538 fn test_max_deepest_empty() {
1539 let chunk = FactorizedChunk::empty();
1540 assert!(chunk.max_deepest(0).is_none());
1541 }
1542
1543 #[test]
1544 fn test_value_less_than() {
1545 assert!(FactorizedChunk::value_less_than(
1547 &Value::Null,
1548 &Value::Int64(1)
1549 ));
1550 assert!(!FactorizedChunk::value_less_than(
1551 &Value::Int64(1),
1552 &Value::Null
1553 ));
1554 assert!(!FactorizedChunk::value_less_than(
1555 &Value::Null,
1556 &Value::Null
1557 ));
1558
1559 assert!(FactorizedChunk::value_less_than(
1561 &Value::Int64(1),
1562 &Value::Int64(2)
1563 ));
1564 assert!(!FactorizedChunk::value_less_than(
1565 &Value::Int64(2),
1566 &Value::Int64(1)
1567 ));
1568
1569 assert!(FactorizedChunk::value_less_than(
1571 &Value::Float64(1.5),
1572 &Value::Float64(2.5)
1573 ));
1574
1575 assert!(FactorizedChunk::value_less_than(
1577 &Value::Int64(1),
1578 &Value::Float64(1.5)
1579 ));
1580 assert!(FactorizedChunk::value_less_than(
1581 &Value::Float64(0.5),
1582 &Value::Int64(1)
1583 ));
1584
1585 assert!(FactorizedChunk::value_less_than(
1587 &Value::String("apple".into()),
1588 &Value::String("banana".into())
1589 ));
1590
1591 assert!(FactorizedChunk::value_less_than(
1593 &Value::Bool(false),
1594 &Value::Bool(true)
1595 ));
1596 assert!(!FactorizedChunk::value_less_than(
1597 &Value::Bool(true),
1598 &Value::Bool(false)
1599 ));
1600
1601 assert!(!FactorizedChunk::value_less_than(
1603 &Value::Int64(1),
1604 &Value::String("hello".into())
1605 ));
1606 }
1607
1608 #[test]
1609 fn test_filter_deepest() {
1610 let chunk = create_multi_level_chunk();
1611
1612 let filtered = chunk.filter_deepest(0, |v| {
1614 if let Value::Int64(n) = v {
1615 *n > 2
1616 } else {
1617 false
1618 }
1619 });
1620
1621 let filtered = filtered.unwrap();
1622 assert_eq!(filtered.logical_row_count(), 2); }
1624
1625 #[test]
1626 fn test_filter_deepest_empty() {
1627 let chunk = FactorizedChunk::empty();
1628 assert!(chunk.filter_deepest(0, |_| true).is_none());
1629 }
1630
1631 #[test]
1632 fn test_filter_deepest_all_filtered() {
1633 let chunk = create_multi_level_chunk();
1634
1635 let filtered = chunk.filter_deepest(0, |_| false);
1637
1638 let filtered = filtered.unwrap();
1639 assert_eq!(filtered.logical_row_count(), 0);
1640 }
1641
1642 #[test]
1643 fn test_filter_deepest_invalid_column() {
1644 let chunk = create_multi_level_chunk();
1645 assert!(chunk.filter_deepest(10, |_| true).is_none());
1646 }
1647
1648 #[test]
1649 fn test_filter_deepest_multi() {
1650 let mut sources = ValueVector::with_type(LogicalType::Int64);
1652 sources.push_int64(1);
1653
1654 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1655
1656 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1657 col1.push_int64(10);
1658 col1.push_int64(20);
1659 col1.push_int64(30);
1660
1661 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1662 col2.push_int64(1);
1663 col2.push_int64(2);
1664 col2.push_int64(3);
1665
1666 let offsets = vec![0, 3];
1667 chunk.add_level(
1668 vec![col1, col2],
1669 vec!["a".to_string(), "b".to_string()],
1670 &offsets,
1671 );
1672
1673 let filtered = chunk.filter_deepest_multi(|values| {
1675 if values.len() == 2
1676 && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1677 {
1678 return *a + *b > 15;
1679 }
1680 false
1681 });
1682
1683 assert!(filtered.is_some());
1684 let filtered = filtered.unwrap();
1685 assert_eq!(filtered.logical_row_count(), 2); }
1687
1688 #[test]
1689 fn test_filter_deepest_multi_empty() {
1690 let chunk = FactorizedChunk::empty();
1691 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1692 }
1693
1694 #[test]
1695 fn test_filter_deepest_multi_no_columns() {
1696 let mut sources = ValueVector::with_type(LogicalType::Int64);
1698 sources.push_int64(1);
1699
1700 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1701
1702 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1704 chunk.add_factorized_level(empty_level);
1705
1706 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1707 }
1708
1709 #[test]
1710 fn test_project() {
1711 let mut sources = ValueVector::with_type(LogicalType::Int64);
1712 sources.push_int64(1);
1713 sources.push_int64(2);
1714
1715 let mut col2 = ValueVector::with_type(LogicalType::String);
1716 col2.push_string("a");
1717 col2.push_string("b");
1718
1719 let chunk = FactorizedChunk::with_flat_level(
1720 vec![sources, col2],
1721 vec!["num".to_string(), "str".to_string()],
1722 );
1723
1724 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1726
1727 assert_eq!(projected.total_column_count(), 1);
1728 let names = projected.all_column_names();
1729 assert_eq!(names, vec!["projected_num"]);
1730 }
1731
1732 #[test]
1733 fn test_project_empty() {
1734 let chunk = FactorizedChunk::empty();
1735 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1736 assert_eq!(projected.level_count(), 0);
1737 }
1738
1739 #[test]
1740 fn test_project_empty_specs() {
1741 let chunk = create_multi_level_chunk();
1742 let projected = chunk.project(&[]);
1743 assert_eq!(projected.level_count(), 0);
1744 }
1745
1746 #[test]
1747 fn test_project_invalid_level() {
1748 let chunk = create_multi_level_chunk();
1749
1750 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1752 assert_eq!(projected.level_count(), 0);
1753 }
1754
1755 #[test]
1756 fn test_project_multi_level() {
1757 let chunk = create_multi_level_chunk();
1758
1759 let projected =
1761 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1762
1763 assert_eq!(projected.level_count(), 2);
1764 assert_eq!(projected.total_column_count(), 2);
1765 }
1766
1767 #[test]
1768 fn test_total_column_count() {
1769 let chunk = create_multi_level_chunk();
1770 assert_eq!(chunk.total_column_count(), 2); }
1772
1773 #[test]
1774 fn test_chunk_state_access() {
1775 let mut chunk = create_multi_level_chunk();
1776
1777 let state = chunk.chunk_state();
1778 assert!(state.is_factorized());
1779
1780 let state_mut = chunk.chunk_state_mut();
1781 state_mut.invalidate_cache();
1782 }
1783
1784 #[test]
1785 fn test_logical_row_iter_multi_level() {
1786 let chunk = create_multi_level_chunk();
1787
1788 let indices: Vec<_> = chunk.logical_row_iter().collect();
1789 assert_eq!(indices.len(), 4);
1790
1791 assert_eq!(indices[0], vec![0, 0]);
1793 assert_eq!(indices[1], vec![0, 1]);
1794 assert_eq!(indices[2], vec![1, 2]);
1795 assert_eq!(indices[3], vec![1, 3]);
1796 }
1797
1798 #[test]
1799 fn test_sum_deepest_with_float() {
1800 let mut sources = ValueVector::with_type(LogicalType::Int64);
1801 sources.push_int64(1);
1802
1803 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1804
1805 let mut floats = ValueVector::with_type(LogicalType::Float64);
1806 floats.push_float64(1.5);
1807 floats.push_float64(2.5);
1808 floats.push_float64(3.0);
1809
1810 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1811
1812 let sum = chunk.sum_deepest(0);
1813 assert_eq!(sum, Some(7.0)); }
1815
1816 #[test]
1817 fn test_min_max_with_strings() {
1818 let mut sources = ValueVector::with_type(LogicalType::Int64);
1819 sources.push_int64(1);
1820
1821 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1822
1823 let mut strings = ValueVector::with_type(LogicalType::String);
1824 strings.push_string("banana");
1825 strings.push_string("apple");
1826 strings.push_string("cherry");
1827
1828 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1829
1830 let min = chunk.min_deepest(0);
1831 assert_eq!(min, Some(Value::String("apple".into())));
1832
1833 let max = chunk.max_deepest(0);
1834 assert_eq!(max, Some(Value::String("cherry".into())));
1835 }
1836
1837 #[test]
1838 fn test_recompute_logical_row_count_empty() {
1839 let mut chunk = FactorizedChunk::empty();
1840 chunk.recompute_logical_row_count();
1841 assert_eq!(chunk.logical_row_count(), 0);
1842 }
1843
1844 #[test]
1845 fn test_factorization_level_group_count() {
1846 let chunk = create_multi_level_chunk();
1847
1848 let level0 = chunk.level(0).unwrap();
1849 assert_eq!(level0.group_count(), 2);
1850
1851 let level1 = chunk.level(1).unwrap();
1852 assert_eq!(level1.group_count(), 4);
1853 }
1854
1855 #[test]
1856 fn test_factorization_level_multiplicities() {
1857 let chunk = create_multi_level_chunk();
1858
1859 let level1 = chunk.level(1).unwrap();
1860 let mults = level1.multiplicities();
1861 assert_eq!(mults, &[2, 2]); }
1863
1864 #[test]
1865 fn test_factorization_level_column_names() {
1866 let chunk = create_multi_level_chunk();
1867
1868 let level0 = chunk.level(0).unwrap();
1869 assert_eq!(level0.column_names(), &["src"]);
1870
1871 let level1 = chunk.level(1).unwrap();
1872 assert_eq!(level1.column_names(), &["nbr"]);
1873 }
1874}