1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
242 if let Some(cached) = self.state.cached_multiplicities() {
244 return Arc::clone(cached);
245 }
246
247 let mults = self.compute_path_multiplicities();
249 let arc_mults: Arc<[usize]> = mults.into();
250 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
251 arc_mults
252 }
253
254 #[must_use]
256 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
257 self.levels.get(index)
258 }
259
260 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
262 self.levels.get_mut(index)
263 }
264
265 pub fn add_level(
276 &mut self,
277 columns: Vec<ValueVector>,
278 column_names: Vec<String>,
279 offsets: &[u32],
280 ) {
281 let parent_count = offsets.len().saturating_sub(1);
282
283 let multiplicities: Vec<usize> = (0..parent_count)
285 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
286 .collect();
287
288 let factorized_columns: Vec<FactorizedVector> = columns
290 .into_iter()
291 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
292 .collect();
293
294 let level =
295 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
296 self.levels.push(level);
297
298 if self.levels.len() == 1 {
302 self.logical_row_count = multiplicities.iter().sum();
304 } else {
305 self.recompute_logical_row_count();
307 }
308
309 self.update_state();
311 }
312
313 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
315 self.levels.push(level);
316 self.recompute_logical_row_count();
317 self.update_state();
318 }
319
320 fn update_state(&mut self) {
322 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
323 }
324
325 fn recompute_logical_row_count(&mut self) {
327 if self.levels.is_empty() {
328 self.logical_row_count = 0;
329 return;
330 }
331
332 let level0_count = self.levels[0].group_count;
334 if self.levels.len() == 1 {
335 self.logical_row_count = level0_count;
336 return;
337 }
338
339 let mut counts = vec![1usize; level0_count];
342
343 for level_idx in 1..self.levels.len() {
344 let level = &self.levels[level_idx];
345 let mut new_counts = Vec::with_capacity(counts.len() * 2); for (parent_idx, &parent_count) in counts.iter().enumerate() {
348 if parent_idx < level.multiplicities.len() {
350 let child_mult = level.multiplicities[parent_idx];
351 for _ in 0..child_mult {
352 new_counts.push(parent_count);
353 }
354 }
355 }
356
357 counts = new_counts;
358 }
359
360 self.logical_row_count = counts.len();
361 }
362
363 #[must_use]
367 pub fn flatten(&self) -> DataChunk {
368 if self.levels.is_empty() {
369 return DataChunk::empty();
370 }
371
372 let mut all_columns: Vec<ValueVector> = Vec::new();
374
375 if self.levels.len() == 1 {
377 let level = &self.levels[0];
378 for col in &level.columns {
379 all_columns.push(col.flatten(None));
380 }
381 return DataChunk::new(all_columns);
382 }
383
384 let row_iter = self.logical_row_iter();
387 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
388
389 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
391 for level in &self.levels {
392 for col in &level.columns {
393 output_columns.push(ValueVector::with_capacity(
394 col.data_type(),
395 self.logical_row_count,
396 ));
397 }
398 }
399
400 for indices in row_iter {
402 let mut col_offset = 0;
403 for (level_idx, level) in self.levels.iter().enumerate() {
404 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
405 for (col_idx, col) in level.columns.iter().enumerate() {
406 if let Some(value) = col.get_physical(level_idx_value) {
407 output_columns[col_offset + col_idx].push_value(value);
408 }
409 }
410 col_offset += level.column_count();
411 }
412 }
413
414 DataChunk::new(output_columns)
415 }
416
417 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
421 FactorizedRowIterator::new(self)
422 }
423
424 #[must_use]
426 pub fn total_column_count(&self) -> usize {
427 self.levels.iter().map(|l| l.column_count()).sum()
428 }
429
430 #[must_use]
432 pub fn all_column_names(&self) -> Vec<String> {
433 self.levels
434 .iter()
435 .flat_map(|l| l.column_names.iter().cloned())
436 .collect()
437 }
438
439 #[must_use]
457 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
458 where
459 F: Fn(&grafeo_common::types::Value) -> bool,
460 {
461 if self.levels.is_empty() {
462 return None;
463 }
464
465 let deepest_idx = self.levels.len() - 1;
466 let deepest = &self.levels[deepest_idx];
467
468 let filter_col = deepest.column(column_idx)?;
470
471 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
473 .map(|i| {
474 ValueVector::with_type(
475 deepest
476 .column(i)
477 .expect("column exists: i < column_count")
478 .data_type(),
479 )
480 })
481 .collect();
482
483 let parent_count = filter_col.parent_count();
485 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
486 let mut new_offsets: Vec<u32> = vec![0];
487
488 for parent_idx in 0..parent_count {
490 let (start, end) = filter_col.range_for_parent(parent_idx);
491
492 for phys_idx in start..end {
493 if let Some(value) = filter_col.get_physical(phys_idx)
495 && predicate(&value)
496 {
497 for col_idx in 0..deepest.column_count() {
499 if let Some(col) = deepest.column(col_idx)
500 && let Some(v) = col.get_physical(phys_idx)
501 {
502 new_columns[col_idx].push_value(v);
503 }
504 }
505 new_multiplicities[parent_idx] += 1;
506 }
507 }
508
509 new_offsets.push(new_columns[0].len() as u32);
510 }
511
512 let total_remaining: usize = new_multiplicities.iter().sum();
514 if total_remaining == 0 {
515 return Some(Self::empty());
516 }
517
518 let new_factorized_cols: Vec<FactorizedVector> = new_columns
520 .into_iter()
521 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
522 .collect();
523
524 let new_level = FactorizationLevel::unflat(
525 new_factorized_cols,
526 deepest.column_names().to_vec(),
527 new_multiplicities,
528 );
529
530 let mut result = Self {
532 levels: self.levels[..deepest_idx].to_vec(),
533 logical_row_count: 0,
534 state: ChunkState::flat(0),
535 };
536 result.levels.push(new_level);
537 result.recompute_logical_row_count();
538 result.update_state();
539
540 Some(result)
541 }
542
543 #[must_use]
556 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
557 where
558 F: Fn(&[grafeo_common::types::Value]) -> bool,
559 {
560 if self.levels.is_empty() {
561 return None;
562 }
563
564 let deepest_idx = self.levels.len() - 1;
565 let deepest = &self.levels[deepest_idx];
566 let col_count = deepest.column_count();
567
568 if col_count == 0 {
569 return None;
570 }
571
572 let first_col = deepest.column(0)?;
573 let parent_count = first_col.parent_count();
574
575 let mut new_columns: Vec<ValueVector> = (0..col_count)
577 .map(|i| {
578 ValueVector::with_type(
579 deepest
580 .column(i)
581 .expect("column exists: i < column_count")
582 .data_type(),
583 )
584 })
585 .collect();
586
587 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
588 let mut new_offsets: Vec<u32> = vec![0];
589 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
590
591 for parent_idx in 0..parent_count {
592 let (start, end) = first_col.range_for_parent(parent_idx);
593
594 for phys_idx in start..end {
595 row_values.clear();
597 for col_idx in 0..col_count {
598 if let Some(col) = deepest.column(col_idx)
599 && let Some(v) = col.get_physical(phys_idx)
600 {
601 row_values.push(v);
602 }
603 }
604
605 if predicate(&row_values) {
607 for (col_idx, v) in row_values.iter().enumerate() {
608 new_columns[col_idx].push_value(v.clone());
609 }
610 new_multiplicities[parent_idx] += 1;
611 }
612 }
613
614 new_offsets.push(new_columns[0].len() as u32);
615 }
616
617 let total: usize = new_multiplicities.iter().sum();
619 if total == 0 {
620 return Some(Self::empty());
621 }
622
623 let new_factorized_cols: Vec<FactorizedVector> = new_columns
625 .into_iter()
626 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
627 .collect();
628
629 let new_level = FactorizationLevel::unflat(
630 new_factorized_cols,
631 deepest.column_names().to_vec(),
632 new_multiplicities,
633 );
634
635 let mut result = Self {
636 levels: self.levels[..deepest_idx].to_vec(),
637 logical_row_count: 0,
638 state: ChunkState::flat(0),
639 };
640 result.levels.push(new_level);
641 result.recompute_logical_row_count();
642 result.update_state();
643
644 Some(result)
645 }
646
647 #[must_use]
667 pub fn count_rows(&self) -> usize {
668 self.logical_row_count()
669 }
670
671 #[must_use]
688 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
689 if self.levels.is_empty() {
690 return Vec::new();
691 }
692
693 if self.levels.len() == 1 {
695 return vec![1; self.levels[0].group_count];
696 }
697
698 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
700
701 for level_idx in 1..self.levels.len() {
703 let level = &self.levels[level_idx];
704 let mut child_multiplicities = Vec::with_capacity(level.group_count);
705
706 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
708 let child_count = if parent_idx < level.multiplicities.len() {
709 level.multiplicities[parent_idx]
710 } else {
711 0
712 };
713
714 for _ in 0..child_count {
716 child_multiplicities.push(parent_mult);
717 }
718 }
719
720 parent_multiplicities = child_multiplicities;
721 }
722
723 parent_multiplicities
724 }
725
726 #[must_use]
739 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
740 if self.levels.is_empty() {
741 return None;
742 }
743
744 let deepest_idx = self.levels.len() - 1;
745 let deepest = &self.levels[deepest_idx];
746 let col = deepest.column(column_idx)?;
747
748 let multiplicities = self.compute_path_multiplicities();
750
751 let mut sum = 0.0;
752 for (phys_idx, mult) in multiplicities.iter().enumerate() {
753 if let Some(value) = col.get_physical(phys_idx) {
754 let num_value = match &value {
756 grafeo_common::types::Value::Int64(v) => *v as f64,
757 grafeo_common::types::Value::Float64(v) => *v,
758 _ => continue, };
760 sum += num_value * (*mult as f64);
761 }
762 }
763 Some(sum)
764 }
765
766 #[must_use]
778 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
779 let count = self.logical_row_count();
780 if count == 0 {
781 return None;
782 }
783
784 let sum = self.sum_deepest(column_idx)?;
785 Some(sum / count as f64)
786 }
787
788 #[must_use]
801 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
802 if self.levels.is_empty() {
803 return None;
804 }
805
806 let deepest_idx = self.levels.len() - 1;
807 let deepest = &self.levels[deepest_idx];
808 let col = deepest.column(column_idx)?;
809
810 let mut min_value: Option<grafeo_common::types::Value> = None;
811
812 for phys_idx in 0..col.physical_len() {
813 if let Some(value) = col.get_physical(phys_idx) {
814 min_value = Some(match min_value {
815 None => value,
816 Some(current) => {
817 if Self::value_less_than(&value, ¤t) {
818 value
819 } else {
820 current
821 }
822 }
823 });
824 }
825 }
826
827 min_value
828 }
829
830 #[must_use]
843 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
844 if self.levels.is_empty() {
845 return None;
846 }
847
848 let deepest_idx = self.levels.len() - 1;
849 let deepest = &self.levels[deepest_idx];
850 let col = deepest.column(column_idx)?;
851
852 let mut max_value: Option<grafeo_common::types::Value> = None;
853
854 for phys_idx in 0..col.physical_len() {
855 if let Some(value) = col.get_physical(phys_idx) {
856 max_value = Some(match max_value {
857 None => value,
858 Some(current) => {
859 if Self::value_less_than(¤t, &value) {
860 value
861 } else {
862 current
863 }
864 }
865 });
866 }
867 }
868
869 max_value
870 }
871
872 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
880 use grafeo_common::types::Value;
881
882 match (a, b) {
883 (Value::Null, Value::Null) => false,
885 (Value::Null, _) => true,
886 (_, Value::Null) => false,
887
888 (Value::Int64(x), Value::Int64(y)) => x < y,
890 (Value::Float64(x), Value::Float64(y)) => x < y,
891 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
892 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
893
894 (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
896
897 (Value::Bool(x), Value::Bool(y)) => !x && *y,
899
900 _ => false,
903 }
904 }
905
906 #[must_use]
920 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
921 if self.levels.is_empty() || column_specs.is_empty() {
922 return Self::empty();
923 }
924
925 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
927 for (level_idx, col_idx, name) in column_specs {
928 if *level_idx < self.levels.len() {
929 level_specs[*level_idx].push((*col_idx, name.clone()));
930 }
931 }
932
933 let mut new_levels = Vec::new();
935
936 for (level_idx, specs) in level_specs.iter().enumerate() {
937 if specs.is_empty() {
938 continue;
939 }
940
941 let src_level = &self.levels[level_idx];
942
943 let columns: Vec<FactorizedVector> = specs
944 .iter()
945 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
946 .collect();
947
948 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
949
950 if level_idx == 0 {
951 new_levels.push(FactorizationLevel::flat(columns, names));
952 } else {
953 let mults = src_level.multiplicities().to_vec();
954 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
955 }
956 }
957
958 if new_levels.is_empty() {
959 return Self::empty();
960 }
961
962 let mut result = Self {
963 levels: new_levels,
964 logical_row_count: 0,
965 state: ChunkState::flat(0),
966 };
967 result.recompute_logical_row_count();
968 result.update_state();
969 result
970 }
971}
972
973pub struct FactorizedRowIterator<'a> {
988 chunk: &'a FactorizedChunk,
989 indices: Vec<usize>,
991 exhausted: bool,
994}
995
996impl<'a> FactorizedRowIterator<'a> {
997 fn new(chunk: &'a FactorizedChunk) -> Self {
998 let indices = vec![0; chunk.level_count()];
999 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
1000
1001 let mut iter = Self {
1002 chunk,
1003 indices,
1004 exhausted,
1005 };
1006
1007 if !exhausted && !iter.has_valid_deepest_range() {
1009 if !iter.advance() {
1010 exhausted = true;
1011 }
1012 iter.exhausted = exhausted;
1013 }
1014
1015 iter
1016 }
1017
1018 fn advance(&mut self) -> bool {
1020 if self.exhausted || self.chunk.levels.is_empty() {
1021 return false;
1022 }
1023
1024 for level_idx in (0..self.chunk.levels.len()).rev() {
1026 let level = &self.chunk.levels[level_idx];
1027
1028 let parent_idx = if level_idx == 0 {
1030 self.indices[0] + 1
1032 } else {
1033 self.indices[level_idx - 1]
1035 };
1036
1037 let (_start, end) = if level_idx == 0 {
1039 (0, level.group_count)
1040 } else {
1041 if let Some(col) = level.columns.first() {
1043 col.range_for_parent(parent_idx)
1044 } else {
1045 (0, 0)
1046 }
1047 };
1048
1049 let current = self.indices[level_idx];
1050 if current + 1 < end {
1051 self.indices[level_idx] = current + 1;
1053 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1055 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1056 let (deeper_start, _) =
1057 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1058 self.indices[deeper_idx] = deeper_start;
1059 }
1060 }
1061
1062 if self.has_valid_deepest_range() {
1065 return true;
1066 }
1067 return self.advance();
1070 }
1071 }
1073
1074 self.exhausted = true;
1076 false
1077 }
1078
1079 fn has_valid_deepest_range(&self) -> bool {
1085 if self.chunk.levels.len() <= 1 {
1086 return true; }
1088
1089 for level_idx in 1..self.chunk.levels.len() {
1091 let parent_idx = self.indices[level_idx - 1];
1092 if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1093 let (start, end) = col.range_for_parent(parent_idx);
1094 if start >= end {
1095 return false;
1096 }
1097 } else {
1098 return false;
1099 }
1100 }
1101
1102 true
1103 }
1104}
1105
1106impl Iterator for FactorizedRowIterator<'_> {
1107 type Item = Vec<usize>;
1108
1109 fn next(&mut self) -> Option<Self::Item> {
1110 if self.exhausted {
1111 return None;
1112 }
1113
1114 let result = self.indices.clone();
1116 self.advance();
1117 Some(result)
1118 }
1119}
1120
1121#[derive(Debug, Clone)]
1123pub enum ChunkVariant {
1124 Flat(DataChunk),
1126 Factorized(FactorizedChunk),
1128}
1129
1130impl ChunkVariant {
1131 #[must_use]
1133 pub fn flat(chunk: DataChunk) -> Self {
1134 Self::Flat(chunk)
1135 }
1136
1137 #[must_use]
1139 pub fn factorized(chunk: FactorizedChunk) -> Self {
1140 Self::Factorized(chunk)
1141 }
1142
1143 #[must_use]
1145 pub fn ensure_flat(self) -> DataChunk {
1146 match self {
1147 Self::Flat(chunk) => chunk,
1148 Self::Factorized(chunk) => chunk.flatten(),
1149 }
1150 }
1151
1152 #[must_use]
1154 pub fn logical_row_count(&self) -> usize {
1155 match self {
1156 Self::Flat(chunk) => chunk.row_count(),
1157 Self::Factorized(chunk) => chunk.logical_row_count(),
1158 }
1159 }
1160
1161 #[must_use]
1163 pub fn is_factorized(&self) -> bool {
1164 matches!(self, Self::Factorized(_))
1165 }
1166
1167 #[must_use]
1169 pub fn is_flat(&self) -> bool {
1170 matches!(self, Self::Flat(_))
1171 }
1172
1173 #[must_use]
1175 pub fn is_empty(&self) -> bool {
1176 self.logical_row_count() == 0
1177 }
1178}
1179
1180impl From<DataChunk> for ChunkVariant {
1181 fn from(chunk: DataChunk) -> Self {
1182 Self::Flat(chunk)
1183 }
1184}
1185
1186impl From<FactorizedChunk> for ChunkVariant {
1187 fn from(chunk: FactorizedChunk) -> Self {
1188 Self::Factorized(chunk)
1189 }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194 use grafeo_common::types::{LogicalType, NodeId, Value};
1195
1196 use super::*;
1197
1198 fn make_flat_chunk() -> DataChunk {
1199 let mut col = ValueVector::with_type(LogicalType::Int64);
1200 col.push_int64(1);
1201 col.push_int64(2);
1202 DataChunk::new(vec![col])
1203 }
1204
1205 fn create_multi_level_chunk() -> FactorizedChunk {
1206 let mut sources = ValueVector::with_type(LogicalType::Int64);
1208 sources.push_int64(10);
1209 sources.push_int64(20);
1210
1211 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1212
1213 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1214 neighbors.push_int64(1);
1215 neighbors.push_int64(2);
1216 neighbors.push_int64(3);
1217 neighbors.push_int64(4);
1218
1219 let offsets = vec![0, 2, 4];
1220 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1221 chunk
1222 }
1223
1224 #[test]
1225 fn test_from_flat() {
1226 let flat = make_flat_chunk();
1227 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1228
1229 assert_eq!(factorized.level_count(), 1);
1230 assert_eq!(factorized.logical_row_count(), 2);
1231 assert_eq!(factorized.physical_size(), 2);
1232 }
1233
1234 #[test]
1235 fn test_add_level() {
1236 let mut col0 = ValueVector::with_type(LogicalType::Node);
1238 col0.push_node_id(NodeId::new(100));
1239 col0.push_node_id(NodeId::new(200));
1240
1241 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1242
1243 assert_eq!(chunk.level_count(), 1);
1244 assert_eq!(chunk.logical_row_count(), 2);
1245
1246 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1248 neighbors.push_node_id(NodeId::new(10));
1249 neighbors.push_node_id(NodeId::new(11));
1250 neighbors.push_node_id(NodeId::new(12));
1251 neighbors.push_node_id(NodeId::new(20));
1252 neighbors.push_node_id(NodeId::new(21));
1253
1254 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1256
1257 assert_eq!(chunk.level_count(), 2);
1258 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1261
1262 #[test]
1263 fn test_flatten_single_level() {
1264 let flat = make_flat_chunk();
1265 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1266
1267 let flattened = factorized.flatten();
1268 assert_eq!(flattened.row_count(), 2);
1269 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1270 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1271 }
1272
1273 #[test]
1274 fn test_flatten_multi_level() {
1275 let mut sources = ValueVector::with_type(LogicalType::Int64);
1277 sources.push_int64(1);
1278 sources.push_int64(2);
1279
1280 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1281
1282 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1283 neighbors.push_int64(10);
1284 neighbors.push_int64(11);
1285 neighbors.push_int64(20);
1286 neighbors.push_int64(21);
1287
1288 let offsets = vec![0, 2, 4];
1289 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1290
1291 let flat = chunk.flatten();
1292 assert_eq!(flat.row_count(), 4);
1293 assert_eq!(flat.column_count(), 2);
1294
1295 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1298 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1299 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1300 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1301 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1302 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1303 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1304 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1305 }
1306
1307 #[test]
1308 fn test_logical_row_iter_single_level() {
1309 let flat = make_flat_chunk();
1310 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1311
1312 let indices: Vec<_> = factorized.logical_row_iter().collect();
1313 assert_eq!(indices.len(), 2);
1314 assert_eq!(indices[0], vec![0]);
1315 assert_eq!(indices[1], vec![1]);
1316 }
1317
1318 #[test]
1319 fn test_chunk_variant() {
1320 let flat = make_flat_chunk();
1321 let variant = ChunkVariant::flat(flat.clone());
1322
1323 assert!(variant.is_flat());
1324 assert!(!variant.is_factorized());
1325 assert_eq!(variant.logical_row_count(), 2);
1326
1327 let ensured = variant.ensure_flat();
1328 assert_eq!(ensured.row_count(), 2);
1329 }
1330
1331 #[test]
1332 fn test_chunk_variant_factorized() {
1333 let chunk = create_multi_level_chunk();
1334 let variant = ChunkVariant::factorized(chunk);
1335
1336 assert!(variant.is_factorized());
1337 assert!(!variant.is_flat());
1338 assert_eq!(variant.logical_row_count(), 4);
1339
1340 let flat = variant.ensure_flat();
1341 assert_eq!(flat.row_count(), 4);
1342 }
1343
1344 #[test]
1345 fn test_chunk_variant_from() {
1346 let flat = make_flat_chunk();
1347 let variant: ChunkVariant = flat.into();
1348 assert!(variant.is_flat());
1349
1350 let factorized = create_multi_level_chunk();
1351 let variant2: ChunkVariant = factorized.into();
1352 assert!(variant2.is_factorized());
1353 }
1354
1355 #[test]
1356 fn test_chunk_variant_is_empty() {
1357 let empty_flat = DataChunk::empty();
1358 let variant = ChunkVariant::flat(empty_flat);
1359 assert!(variant.is_empty());
1360
1361 let non_empty = make_flat_chunk();
1362 let variant2 = ChunkVariant::flat(non_empty);
1363 assert!(!variant2.is_empty());
1364 }
1365
1366 #[test]
1367 fn test_empty_chunk() {
1368 let chunk = FactorizedChunk::empty();
1369 assert_eq!(chunk.level_count(), 0);
1370 assert_eq!(chunk.logical_row_count(), 0);
1371 assert_eq!(chunk.physical_size(), 0);
1372
1373 let flat = chunk.flatten();
1374 assert!(flat.is_empty());
1375 }
1376
1377 #[test]
1378 fn test_all_column_names() {
1379 let mut sources = ValueVector::with_type(LogicalType::Int64);
1380 sources.push_int64(1);
1381
1382 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1383
1384 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1385 neighbors.push_int64(10);
1386
1387 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1388
1389 let names = chunk.all_column_names();
1390 assert_eq!(names, vec!["source", "neighbor"]);
1391 }
1392
1393 #[test]
1394 fn test_level_mut() {
1395 let mut chunk = create_multi_level_chunk();
1396
1397 let level = chunk.level_mut(0).unwrap();
1399 assert_eq!(level.column_count(), 1);
1400
1401 assert!(chunk.level_mut(10).is_none());
1403 }
1404
1405 #[test]
1406 fn test_factorization_level_column_mut() {
1407 let mut chunk = create_multi_level_chunk();
1408
1409 let level = chunk.level_mut(0).unwrap();
1410 let col = level.column_mut(0);
1411 assert!(col.is_some());
1412
1413 assert!(level.column_mut(10).is_none());
1415 }
1416
1417 #[test]
1418 fn test_factorization_level_physical_value_count() {
1419 let chunk = create_multi_level_chunk();
1420
1421 let level0 = chunk.level(0).unwrap();
1422 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1425 assert_eq!(level1.physical_value_count(), 4); }
1427
1428 #[test]
1429 fn test_count_rows() {
1430 let chunk = create_multi_level_chunk();
1431 assert_eq!(chunk.count_rows(), 4);
1432
1433 let empty = FactorizedChunk::empty();
1434 assert_eq!(empty.count_rows(), 0);
1435 }
1436
1437 #[test]
1438 fn test_compute_path_multiplicities() {
1439 let chunk = create_multi_level_chunk();
1440
1441 let mults = chunk.compute_path_multiplicities();
1442 assert_eq!(mults.len(), 4);
1444 assert!(mults.iter().all(|&m| m == 1));
1445 }
1446
1447 #[test]
1448 fn test_compute_path_multiplicities_single_level() {
1449 let mut col = ValueVector::with_type(LogicalType::Int64);
1450 col.push_int64(1);
1451 col.push_int64(2);
1452 col.push_int64(3);
1453
1454 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1455 let mults = chunk.compute_path_multiplicities();
1456
1457 assert_eq!(mults.len(), 3);
1459 assert!(mults.iter().all(|&m| m == 1));
1460 }
1461
1462 #[test]
1463 fn test_compute_path_multiplicities_empty() {
1464 let chunk = FactorizedChunk::empty();
1465 let mults = chunk.compute_path_multiplicities();
1466 assert!(mults.is_empty());
1467 }
1468
1469 #[test]
1470 fn test_path_multiplicities_cached() {
1471 let mut chunk = create_multi_level_chunk();
1472
1473 let mults1 = chunk.path_multiplicities_cached();
1475 assert_eq!(mults1.len(), 4);
1476
1477 let mults2 = chunk.path_multiplicities_cached();
1479 assert_eq!(mults1.len(), mults2.len());
1480 }
1481
1482 #[test]
1483 fn test_sum_deepest() {
1484 let chunk = create_multi_level_chunk();
1485
1486 let sum = chunk.sum_deepest(0);
1488 assert_eq!(sum, Some(10.0)); }
1490
1491 #[test]
1492 fn test_sum_deepest_empty() {
1493 let chunk = FactorizedChunk::empty();
1494 assert!(chunk.sum_deepest(0).is_none());
1495 }
1496
1497 #[test]
1498 fn test_sum_deepest_invalid_column() {
1499 let chunk = create_multi_level_chunk();
1500 assert!(chunk.sum_deepest(10).is_none());
1501 }
1502
1503 #[test]
1504 fn test_avg_deepest() {
1505 let chunk = create_multi_level_chunk();
1506
1507 let avg = chunk.avg_deepest(0);
1509 assert_eq!(avg, Some(2.5));
1510 }
1511
1512 #[test]
1513 fn test_avg_deepest_empty() {
1514 let chunk = FactorizedChunk::empty();
1515 assert!(chunk.avg_deepest(0).is_none());
1516 }
1517
1518 #[test]
1519 fn test_min_deepest() {
1520 let chunk = create_multi_level_chunk();
1521
1522 let min = chunk.min_deepest(0);
1523 assert_eq!(min, Some(Value::Int64(1)));
1524 }
1525
1526 #[test]
1527 fn test_min_deepest_empty() {
1528 let chunk = FactorizedChunk::empty();
1529 assert!(chunk.min_deepest(0).is_none());
1530 }
1531
1532 #[test]
1533 fn test_min_deepest_invalid_column() {
1534 let chunk = create_multi_level_chunk();
1535 assert!(chunk.min_deepest(10).is_none());
1536 }
1537
1538 #[test]
1539 fn test_max_deepest() {
1540 let chunk = create_multi_level_chunk();
1541
1542 let max = chunk.max_deepest(0);
1543 assert_eq!(max, Some(Value::Int64(4)));
1544 }
1545
1546 #[test]
1547 fn test_max_deepest_empty() {
1548 let chunk = FactorizedChunk::empty();
1549 assert!(chunk.max_deepest(0).is_none());
1550 }
1551
1552 #[test]
1553 fn test_value_less_than() {
1554 assert!(FactorizedChunk::value_less_than(
1556 &Value::Null,
1557 &Value::Int64(1)
1558 ));
1559 assert!(!FactorizedChunk::value_less_than(
1560 &Value::Int64(1),
1561 &Value::Null
1562 ));
1563 assert!(!FactorizedChunk::value_less_than(
1564 &Value::Null,
1565 &Value::Null
1566 ));
1567
1568 assert!(FactorizedChunk::value_less_than(
1570 &Value::Int64(1),
1571 &Value::Int64(2)
1572 ));
1573 assert!(!FactorizedChunk::value_less_than(
1574 &Value::Int64(2),
1575 &Value::Int64(1)
1576 ));
1577
1578 assert!(FactorizedChunk::value_less_than(
1580 &Value::Float64(1.5),
1581 &Value::Float64(2.5)
1582 ));
1583
1584 assert!(FactorizedChunk::value_less_than(
1586 &Value::Int64(1),
1587 &Value::Float64(1.5)
1588 ));
1589 assert!(FactorizedChunk::value_less_than(
1590 &Value::Float64(0.5),
1591 &Value::Int64(1)
1592 ));
1593
1594 assert!(FactorizedChunk::value_less_than(
1596 &Value::String("apple".into()),
1597 &Value::String("banana".into())
1598 ));
1599
1600 assert!(FactorizedChunk::value_less_than(
1602 &Value::Bool(false),
1603 &Value::Bool(true)
1604 ));
1605 assert!(!FactorizedChunk::value_less_than(
1606 &Value::Bool(true),
1607 &Value::Bool(false)
1608 ));
1609
1610 assert!(!FactorizedChunk::value_less_than(
1612 &Value::Int64(1),
1613 &Value::String("hello".into())
1614 ));
1615 }
1616
1617 #[test]
1618 fn test_filter_deepest() {
1619 let chunk = create_multi_level_chunk();
1620
1621 let filtered = chunk.filter_deepest(0, |v| {
1623 if let Value::Int64(n) = v {
1624 *n > 2
1625 } else {
1626 false
1627 }
1628 });
1629
1630 let filtered = filtered.unwrap();
1631 assert_eq!(filtered.logical_row_count(), 2); }
1633
1634 #[test]
1635 fn test_filter_deepest_empty() {
1636 let chunk = FactorizedChunk::empty();
1637 assert!(chunk.filter_deepest(0, |_| true).is_none());
1638 }
1639
1640 #[test]
1641 fn test_filter_deepest_all_filtered() {
1642 let chunk = create_multi_level_chunk();
1643
1644 let filtered = chunk.filter_deepest(0, |_| false);
1646
1647 let filtered = filtered.unwrap();
1648 assert_eq!(filtered.logical_row_count(), 0);
1649 }
1650
1651 #[test]
1652 fn test_filter_deepest_invalid_column() {
1653 let chunk = create_multi_level_chunk();
1654 assert!(chunk.filter_deepest(10, |_| true).is_none());
1655 }
1656
1657 #[test]
1658 fn test_filter_deepest_multi() {
1659 let mut sources = ValueVector::with_type(LogicalType::Int64);
1661 sources.push_int64(1);
1662
1663 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1664
1665 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1666 col1.push_int64(10);
1667 col1.push_int64(20);
1668 col1.push_int64(30);
1669
1670 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1671 col2.push_int64(1);
1672 col2.push_int64(2);
1673 col2.push_int64(3);
1674
1675 let offsets = vec![0, 3];
1676 chunk.add_level(
1677 vec![col1, col2],
1678 vec!["a".to_string(), "b".to_string()],
1679 &offsets,
1680 );
1681
1682 let filtered = chunk.filter_deepest_multi(|values| {
1684 if values.len() == 2
1685 && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1686 {
1687 return *a + *b > 15;
1688 }
1689 false
1690 });
1691
1692 assert!(filtered.is_some());
1693 let filtered = filtered.unwrap();
1694 assert_eq!(filtered.logical_row_count(), 2); }
1696
1697 #[test]
1698 fn test_filter_deepest_multi_empty() {
1699 let chunk = FactorizedChunk::empty();
1700 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1701 }
1702
1703 #[test]
1704 fn test_filter_deepest_multi_no_columns() {
1705 let mut sources = ValueVector::with_type(LogicalType::Int64);
1707 sources.push_int64(1);
1708
1709 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1710
1711 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1713 chunk.add_factorized_level(empty_level);
1714
1715 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1716 }
1717
1718 #[test]
1719 fn test_project() {
1720 let mut sources = ValueVector::with_type(LogicalType::Int64);
1721 sources.push_int64(1);
1722 sources.push_int64(2);
1723
1724 let mut col2 = ValueVector::with_type(LogicalType::String);
1725 col2.push_string("a");
1726 col2.push_string("b");
1727
1728 let chunk = FactorizedChunk::with_flat_level(
1729 vec![sources, col2],
1730 vec!["num".to_string(), "str".to_string()],
1731 );
1732
1733 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1735
1736 assert_eq!(projected.total_column_count(), 1);
1737 let names = projected.all_column_names();
1738 assert_eq!(names, vec!["projected_num"]);
1739 }
1740
1741 #[test]
1742 fn test_project_empty() {
1743 let chunk = FactorizedChunk::empty();
1744 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1745 assert_eq!(projected.level_count(), 0);
1746 }
1747
1748 #[test]
1749 fn test_project_empty_specs() {
1750 let chunk = create_multi_level_chunk();
1751 let projected = chunk.project(&[]);
1752 assert_eq!(projected.level_count(), 0);
1753 }
1754
1755 #[test]
1756 fn test_project_invalid_level() {
1757 let chunk = create_multi_level_chunk();
1758
1759 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1761 assert_eq!(projected.level_count(), 0);
1762 }
1763
1764 #[test]
1765 fn test_project_multi_level() {
1766 let chunk = create_multi_level_chunk();
1767
1768 let projected =
1770 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1771
1772 assert_eq!(projected.level_count(), 2);
1773 assert_eq!(projected.total_column_count(), 2);
1774 }
1775
1776 #[test]
1777 fn test_total_column_count() {
1778 let chunk = create_multi_level_chunk();
1779 assert_eq!(chunk.total_column_count(), 2); }
1781
1782 #[test]
1783 fn test_chunk_state_access() {
1784 let mut chunk = create_multi_level_chunk();
1785
1786 let state = chunk.chunk_state();
1787 assert!(state.is_factorized());
1788
1789 let state_mut = chunk.chunk_state_mut();
1790 state_mut.invalidate_cache();
1791 }
1792
1793 #[test]
1794 fn test_logical_row_iter_multi_level() {
1795 let chunk = create_multi_level_chunk();
1796
1797 let indices: Vec<_> = chunk.logical_row_iter().collect();
1798 assert_eq!(indices.len(), 4);
1799
1800 assert_eq!(indices[0], vec![0, 0]);
1802 assert_eq!(indices[1], vec![0, 1]);
1803 assert_eq!(indices[2], vec![1, 2]);
1804 assert_eq!(indices[3], vec![1, 3]);
1805 }
1806
1807 #[test]
1808 fn test_sum_deepest_with_float() {
1809 let mut sources = ValueVector::with_type(LogicalType::Int64);
1810 sources.push_int64(1);
1811
1812 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1813
1814 let mut floats = ValueVector::with_type(LogicalType::Float64);
1815 floats.push_float64(1.5);
1816 floats.push_float64(2.5);
1817 floats.push_float64(3.0);
1818
1819 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1820
1821 let sum = chunk.sum_deepest(0);
1822 assert_eq!(sum, Some(7.0)); }
1824
1825 #[test]
1826 fn test_min_max_with_strings() {
1827 let mut sources = ValueVector::with_type(LogicalType::Int64);
1828 sources.push_int64(1);
1829
1830 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1831
1832 let mut strings = ValueVector::with_type(LogicalType::String);
1833 strings.push_string("banana");
1834 strings.push_string("apple");
1835 strings.push_string("cherry");
1836
1837 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1838
1839 let min = chunk.min_deepest(0);
1840 assert_eq!(min, Some(Value::String("apple".into())));
1841
1842 let max = chunk.max_deepest(0);
1843 assert_eq!(max, Some(Value::String("cherry".into())));
1844 }
1845
1846 #[test]
1847 fn test_recompute_logical_row_count_empty() {
1848 let mut chunk = FactorizedChunk::empty();
1849 chunk.recompute_logical_row_count();
1850 assert_eq!(chunk.logical_row_count(), 0);
1851 }
1852
1853 #[test]
1854 fn test_factorization_level_group_count() {
1855 let chunk = create_multi_level_chunk();
1856
1857 let level0 = chunk.level(0).unwrap();
1858 assert_eq!(level0.group_count(), 2);
1859
1860 let level1 = chunk.level(1).unwrap();
1861 assert_eq!(level1.group_count(), 4);
1862 }
1863
1864 #[test]
1865 fn test_factorization_level_multiplicities() {
1866 let chunk = create_multi_level_chunk();
1867
1868 let level1 = chunk.level(1).unwrap();
1869 let mults = level1.multiplicities();
1870 assert_eq!(mults, &[2, 2]); }
1872
1873 #[test]
1874 fn test_factorization_level_column_names() {
1875 let chunk = create_multi_level_chunk();
1876
1877 let level0 = chunk.level(0).unwrap();
1878 assert_eq!(level0.column_names(), &["src"]);
1879
1880 let level1 = chunk.level(1).unwrap();
1881 assert_eq!(level1.column_names(), &["nbr"]);
1882 }
1883}