1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
242 if let Some(cached) = self.state.cached_multiplicities() {
244 return Arc::clone(cached);
245 }
246
247 let mults = self.compute_path_multiplicities();
249 let arc_mults: Arc<[usize]> = mults.into();
250 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
251 arc_mults
252 }
253
254 #[must_use]
256 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
257 self.levels.get(index)
258 }
259
260 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
262 self.levels.get_mut(index)
263 }
264
265 pub fn add_level(
276 &mut self,
277 columns: Vec<ValueVector>,
278 column_names: Vec<String>,
279 offsets: &[u32],
280 ) {
281 let parent_count = offsets.len().saturating_sub(1);
282
283 let multiplicities: Vec<usize> = (0..parent_count)
285 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
286 .collect();
287
288 let factorized_columns: Vec<FactorizedVector> = columns
290 .into_iter()
291 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
292 .collect();
293
294 let level =
295 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
296 self.levels.push(level);
297
298 if self.levels.len() == 1 {
302 self.logical_row_count = multiplicities.iter().sum();
304 } else {
305 self.recompute_logical_row_count();
307 }
308
309 self.update_state();
311 }
312
313 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
315 self.levels.push(level);
316 self.recompute_logical_row_count();
317 self.update_state();
318 }
319
320 fn update_state(&mut self) {
322 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
323 }
324
325 fn recompute_logical_row_count(&mut self) {
327 if self.levels.is_empty() {
328 self.logical_row_count = 0;
329 return;
330 }
331
332 let level0_count = self.levels[0].group_count;
334 if self.levels.len() == 1 {
335 self.logical_row_count = level0_count;
336 return;
337 }
338
339 let mut counts = vec![1usize; level0_count];
342
343 for level_idx in 1..self.levels.len() {
344 let level = &self.levels[level_idx];
345 let mut new_counts = Vec::with_capacity(counts.len() * 2); for (parent_idx, &parent_count) in counts.iter().enumerate() {
348 if parent_idx < level.multiplicities.len() {
350 let child_mult = level.multiplicities[parent_idx];
351 for _ in 0..child_mult {
352 new_counts.push(parent_count);
353 }
354 }
355 }
356
357 counts = new_counts;
358 }
359
360 self.logical_row_count = counts.len();
361 }
362
363 #[must_use]
367 pub fn flatten(&self) -> DataChunk {
368 if self.levels.is_empty() {
369 return DataChunk::empty();
370 }
371
372 let mut all_columns: Vec<ValueVector> = Vec::new();
374
375 if self.levels.len() == 1 {
377 let level = &self.levels[0];
378 for col in &level.columns {
379 all_columns.push(col.flatten(None));
380 }
381 return DataChunk::new(all_columns);
382 }
383
384 let row_iter = self.logical_row_iter();
387 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
388
389 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
391 for level in &self.levels {
392 for col in &level.columns {
393 output_columns.push(ValueVector::with_capacity(
394 col.data_type(),
395 self.logical_row_count,
396 ));
397 }
398 }
399
400 for indices in row_iter {
402 let mut col_offset = 0;
403 for (level_idx, level) in self.levels.iter().enumerate() {
404 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
405 for (col_idx, col) in level.columns.iter().enumerate() {
406 if let Some(value) = col.get_physical(level_idx_value) {
407 output_columns[col_offset + col_idx].push_value(value);
408 }
409 }
410 col_offset += level.column_count();
411 }
412 }
413
414 DataChunk::new(output_columns)
415 }
416
417 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
421 FactorizedRowIterator::new(self)
422 }
423
424 #[must_use]
426 pub fn total_column_count(&self) -> usize {
427 self.levels.iter().map(|l| l.column_count()).sum()
428 }
429
430 #[must_use]
432 pub fn all_column_names(&self) -> Vec<String> {
433 self.levels
434 .iter()
435 .flat_map(|l| l.column_names.iter().cloned())
436 .collect()
437 }
438
439 #[must_use]
457 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
458 where
459 F: Fn(&grafeo_common::types::Value) -> bool,
460 {
461 if self.levels.is_empty() {
462 return None;
463 }
464
465 let deepest_idx = self.levels.len() - 1;
466 let deepest = &self.levels[deepest_idx];
467
468 let filter_col = deepest.column(column_idx)?;
470
471 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
473 .map(|i| {
474 ValueVector::with_type(
475 deepest
476 .column(i)
477 .expect("column exists: i < column_count")
478 .data_type(),
479 )
480 })
481 .collect();
482
483 let parent_count = filter_col.parent_count();
485 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
486 let mut new_offsets: Vec<u32> = vec![0];
487
488 for parent_idx in 0..parent_count {
490 let (start, end) = filter_col.range_for_parent(parent_idx);
491
492 for phys_idx in start..end {
493 if let Some(value) = filter_col.get_physical(phys_idx)
495 && predicate(&value)
496 {
497 for col_idx in 0..deepest.column_count() {
499 if let Some(col) = deepest.column(col_idx)
500 && let Some(v) = col.get_physical(phys_idx)
501 {
502 new_columns[col_idx].push_value(v);
503 }
504 }
505 new_multiplicities[parent_idx] += 1;
506 }
507 }
508
509 new_offsets.push(new_columns[0].len() as u32);
510 }
511
512 let total_remaining: usize = new_multiplicities.iter().sum();
514 if total_remaining == 0 {
515 return Some(Self::empty());
516 }
517
518 let new_factorized_cols: Vec<FactorizedVector> = new_columns
520 .into_iter()
521 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
522 .collect();
523
524 let new_level = FactorizationLevel::unflat(
525 new_factorized_cols,
526 deepest.column_names().to_vec(),
527 new_multiplicities,
528 );
529
530 let mut result = Self {
532 levels: self.levels[..deepest_idx].to_vec(),
533 logical_row_count: 0,
534 state: ChunkState::flat(0),
535 };
536 result.levels.push(new_level);
537 result.recompute_logical_row_count();
538 result.update_state();
539
540 Some(result)
541 }
542
543 #[must_use]
556 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
557 where
558 F: Fn(&[grafeo_common::types::Value]) -> bool,
559 {
560 if self.levels.is_empty() {
561 return None;
562 }
563
564 let deepest_idx = self.levels.len() - 1;
565 let deepest = &self.levels[deepest_idx];
566 let col_count = deepest.column_count();
567
568 if col_count == 0 {
569 return None;
570 }
571
572 let first_col = deepest.column(0)?;
573 let parent_count = first_col.parent_count();
574
575 let mut new_columns: Vec<ValueVector> = (0..col_count)
577 .map(|i| {
578 ValueVector::with_type(
579 deepest
580 .column(i)
581 .expect("column exists: i < column_count")
582 .data_type(),
583 )
584 })
585 .collect();
586
587 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
588 let mut new_offsets: Vec<u32> = vec![0];
589 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
590
591 for parent_idx in 0..parent_count {
592 let (start, end) = first_col.range_for_parent(parent_idx);
593
594 for phys_idx in start..end {
595 row_values.clear();
597 for col_idx in 0..col_count {
598 if let Some(col) = deepest.column(col_idx)
599 && let Some(v) = col.get_physical(phys_idx)
600 {
601 row_values.push(v);
602 }
603 }
604
605 if predicate(&row_values) {
607 for (col_idx, v) in row_values.iter().enumerate() {
608 new_columns[col_idx].push_value(v.clone());
609 }
610 new_multiplicities[parent_idx] += 1;
611 }
612 }
613
614 new_offsets.push(new_columns[0].len() as u32);
615 }
616
617 let total: usize = new_multiplicities.iter().sum();
619 if total == 0 {
620 return Some(Self::empty());
621 }
622
623 let new_factorized_cols: Vec<FactorizedVector> = new_columns
625 .into_iter()
626 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
627 .collect();
628
629 let new_level = FactorizationLevel::unflat(
630 new_factorized_cols,
631 deepest.column_names().to_vec(),
632 new_multiplicities,
633 );
634
635 let mut result = Self {
636 levels: self.levels[..deepest_idx].to_vec(),
637 logical_row_count: 0,
638 state: ChunkState::flat(0),
639 };
640 result.levels.push(new_level);
641 result.recompute_logical_row_count();
642 result.update_state();
643
644 Some(result)
645 }
646
647 #[must_use]
667 pub fn count_rows(&self) -> usize {
668 self.logical_row_count()
669 }
670
671 #[must_use]
688 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
689 if self.levels.is_empty() {
690 return Vec::new();
691 }
692
693 if self.levels.len() == 1 {
695 return vec![1; self.levels[0].group_count];
696 }
697
698 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
700
701 for level_idx in 1..self.levels.len() {
703 let level = &self.levels[level_idx];
704 let mut child_multiplicities = Vec::with_capacity(level.group_count);
705
706 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
708 let child_count = if parent_idx < level.multiplicities.len() {
709 level.multiplicities[parent_idx]
710 } else {
711 0
712 };
713
714 for _ in 0..child_count {
716 child_multiplicities.push(parent_mult);
717 }
718 }
719
720 parent_multiplicities = child_multiplicities;
721 }
722
723 parent_multiplicities
724 }
725
726 #[must_use]
739 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
740 if self.levels.is_empty() {
741 return None;
742 }
743
744 let deepest_idx = self.levels.len() - 1;
745 let deepest = &self.levels[deepest_idx];
746 let col = deepest.column(column_idx)?;
747
748 let multiplicities = self.compute_path_multiplicities();
750
751 let mut sum = 0.0;
752 for (phys_idx, mult) in multiplicities.iter().enumerate() {
753 if let Some(value) = col.get_physical(phys_idx) {
754 let num_value = match &value {
756 grafeo_common::types::Value::Int64(v) => *v as f64,
757 grafeo_common::types::Value::Float64(v) => *v,
758 _ => continue, };
760 sum += num_value * (*mult as f64);
761 }
762 }
763 Some(sum)
764 }
765
766 #[must_use]
778 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
779 let count = self.logical_row_count();
780 if count == 0 {
781 return None;
782 }
783
784 let sum = self.sum_deepest(column_idx)?;
785 Some(sum / count as f64)
786 }
787
788 #[must_use]
801 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
802 if self.levels.is_empty() {
803 return None;
804 }
805
806 let deepest_idx = self.levels.len() - 1;
807 let deepest = &self.levels[deepest_idx];
808 let col = deepest.column(column_idx)?;
809
810 let mut min_value: Option<grafeo_common::types::Value> = None;
811
812 for phys_idx in 0..col.physical_len() {
813 if let Some(value) = col.get_physical(phys_idx) {
814 min_value = Some(match min_value {
815 None => value,
816 Some(current) => {
817 if Self::value_less_than(&value, ¤t) {
818 value
819 } else {
820 current
821 }
822 }
823 });
824 }
825 }
826
827 min_value
828 }
829
830 #[must_use]
843 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
844 if self.levels.is_empty() {
845 return None;
846 }
847
848 let deepest_idx = self.levels.len() - 1;
849 let deepest = &self.levels[deepest_idx];
850 let col = deepest.column(column_idx)?;
851
852 let mut max_value: Option<grafeo_common::types::Value> = None;
853
854 for phys_idx in 0..col.physical_len() {
855 if let Some(value) = col.get_physical(phys_idx) {
856 max_value = Some(match max_value {
857 None => value,
858 Some(current) => {
859 if Self::value_less_than(¤t, &value) {
860 value
861 } else {
862 current
863 }
864 }
865 });
866 }
867 }
868
869 max_value
870 }
871
872 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
880 use grafeo_common::types::Value;
881
882 match (a, b) {
883 (Value::Null, Value::Null) => false,
885 (Value::Null, _) => true,
886 (_, Value::Null) => false,
887
888 (Value::Int64(x), Value::Int64(y)) => x < y,
890 (Value::Float64(x), Value::Float64(y)) => x < y,
891 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
892 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
893
894 (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
896
897 (Value::Bool(x), Value::Bool(y)) => !x && *y,
899
900 _ => false,
903 }
904 }
905
906 #[must_use]
920 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
921 if self.levels.is_empty() || column_specs.is_empty() {
922 return Self::empty();
923 }
924
925 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
927 for (level_idx, col_idx, name) in column_specs {
928 if *level_idx < self.levels.len() {
929 level_specs[*level_idx].push((*col_idx, name.clone()));
930 }
931 }
932
933 let mut new_levels = Vec::new();
935
936 for (level_idx, specs) in level_specs.iter().enumerate() {
937 if specs.is_empty() {
938 continue;
939 }
940
941 let src_level = &self.levels[level_idx];
942
943 let columns: Vec<FactorizedVector> = specs
944 .iter()
945 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
946 .collect();
947
948 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
949
950 if level_idx == 0 {
951 new_levels.push(FactorizationLevel::flat(columns, names));
952 } else {
953 let mults = src_level.multiplicities().to_vec();
954 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
955 }
956 }
957
958 if new_levels.is_empty() {
959 return Self::empty();
960 }
961
962 let mut result = Self {
963 levels: new_levels,
964 logical_row_count: 0,
965 state: ChunkState::flat(0),
966 };
967 result.recompute_logical_row_count();
968 result.update_state();
969 result
970 }
971}
972
973pub struct FactorizedRowIterator<'a> {
988 chunk: &'a FactorizedChunk,
989 indices: Vec<usize>,
991 exhausted: bool,
994}
995
996impl<'a> FactorizedRowIterator<'a> {
997 fn new(chunk: &'a FactorizedChunk) -> Self {
998 let indices = vec![0; chunk.level_count()];
999 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
1000
1001 let mut iter = Self {
1002 chunk,
1003 indices,
1004 exhausted,
1005 };
1006
1007 if !exhausted && !iter.has_valid_deepest_range() {
1009 if !iter.advance() {
1010 exhausted = true;
1011 }
1012 iter.exhausted = exhausted;
1013 }
1014
1015 iter
1016 }
1017
1018 fn advance(&mut self) -> bool {
1020 if self.exhausted || self.chunk.levels.is_empty() {
1021 return false;
1022 }
1023
1024 for level_idx in (0..self.chunk.levels.len()).rev() {
1026 let level = &self.chunk.levels[level_idx];
1027
1028 let parent_idx = if level_idx == 0 {
1030 self.indices[0] + 1
1032 } else {
1033 self.indices[level_idx - 1]
1035 };
1036
1037 let (_start, end) = if level_idx == 0 {
1039 (0, level.group_count)
1040 } else {
1041 if let Some(col) = level.columns.first() {
1043 col.range_for_parent(parent_idx)
1044 } else {
1045 (0, 0)
1046 }
1047 };
1048
1049 let current = self.indices[level_idx];
1050 if current + 1 < end {
1051 self.indices[level_idx] = current + 1;
1053 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1055 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1056 let (deeper_start, _) =
1057 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1058 self.indices[deeper_idx] = deeper_start;
1059 }
1060 }
1061
1062 if self.has_valid_deepest_range() {
1065 return true;
1066 }
1067 return self.advance();
1070 }
1071 }
1073
1074 self.exhausted = true;
1076 false
1077 }
1078
1079 fn has_valid_deepest_range(&self) -> bool {
1085 if self.chunk.levels.len() <= 1 {
1086 return true; }
1088
1089 for level_idx in 1..self.chunk.levels.len() {
1091 let parent_idx = self.indices[level_idx - 1];
1092 if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1093 let (start, end) = col.range_for_parent(parent_idx);
1094 if start >= end {
1095 return false;
1096 }
1097 } else {
1098 return false;
1099 }
1100 }
1101
1102 true
1103 }
1104}
1105
1106impl Iterator for FactorizedRowIterator<'_> {
1107 type Item = Vec<usize>;
1108
1109 fn next(&mut self) -> Option<Self::Item> {
1110 if self.exhausted {
1111 return None;
1112 }
1113
1114 let result = self.indices.clone();
1116 self.advance();
1117 Some(result)
1118 }
1119}
1120
1121#[derive(Debug, Clone)]
1123#[non_exhaustive]
1124pub enum ChunkVariant {
1125 Flat(DataChunk),
1127 Factorized(FactorizedChunk),
1129}
1130
1131impl ChunkVariant {
1132 #[must_use]
1134 pub fn flat(chunk: DataChunk) -> Self {
1135 Self::Flat(chunk)
1136 }
1137
1138 #[must_use]
1140 pub fn factorized(chunk: FactorizedChunk) -> Self {
1141 Self::Factorized(chunk)
1142 }
1143
1144 #[must_use]
1146 pub fn ensure_flat(self) -> DataChunk {
1147 match self {
1148 Self::Flat(chunk) => chunk,
1149 Self::Factorized(chunk) => chunk.flatten(),
1150 }
1151 }
1152
1153 #[must_use]
1155 pub fn logical_row_count(&self) -> usize {
1156 match self {
1157 Self::Flat(chunk) => chunk.row_count(),
1158 Self::Factorized(chunk) => chunk.logical_row_count(),
1159 }
1160 }
1161
1162 #[must_use]
1164 pub fn is_factorized(&self) -> bool {
1165 matches!(self, Self::Factorized(_))
1166 }
1167
1168 #[must_use]
1170 pub fn is_flat(&self) -> bool {
1171 matches!(self, Self::Flat(_))
1172 }
1173
1174 #[must_use]
1176 pub fn is_empty(&self) -> bool {
1177 self.logical_row_count() == 0
1178 }
1179}
1180
1181impl From<DataChunk> for ChunkVariant {
1182 fn from(chunk: DataChunk) -> Self {
1183 Self::Flat(chunk)
1184 }
1185}
1186
1187impl From<FactorizedChunk> for ChunkVariant {
1188 fn from(chunk: FactorizedChunk) -> Self {
1189 Self::Factorized(chunk)
1190 }
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195 use grafeo_common::types::{LogicalType, NodeId, Value};
1196
1197 use super::*;
1198
1199 fn make_flat_chunk() -> DataChunk {
1200 let mut col = ValueVector::with_type(LogicalType::Int64);
1201 col.push_int64(1);
1202 col.push_int64(2);
1203 DataChunk::new(vec![col])
1204 }
1205
1206 fn create_multi_level_chunk() -> FactorizedChunk {
1207 let mut sources = ValueVector::with_type(LogicalType::Int64);
1209 sources.push_int64(10);
1210 sources.push_int64(20);
1211
1212 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1213
1214 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1215 neighbors.push_int64(1);
1216 neighbors.push_int64(2);
1217 neighbors.push_int64(3);
1218 neighbors.push_int64(4);
1219
1220 let offsets = vec![0, 2, 4];
1221 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1222 chunk
1223 }
1224
1225 #[test]
1226 fn test_from_flat() {
1227 let flat = make_flat_chunk();
1228 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1229
1230 assert_eq!(factorized.level_count(), 1);
1231 assert_eq!(factorized.logical_row_count(), 2);
1232 assert_eq!(factorized.physical_size(), 2);
1233 }
1234
1235 #[test]
1236 fn test_add_level() {
1237 let mut col0 = ValueVector::with_type(LogicalType::Node);
1239 col0.push_node_id(NodeId::new(100));
1240 col0.push_node_id(NodeId::new(200));
1241
1242 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1243
1244 assert_eq!(chunk.level_count(), 1);
1245 assert_eq!(chunk.logical_row_count(), 2);
1246
1247 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1249 neighbors.push_node_id(NodeId::new(10));
1250 neighbors.push_node_id(NodeId::new(11));
1251 neighbors.push_node_id(NodeId::new(12));
1252 neighbors.push_node_id(NodeId::new(20));
1253 neighbors.push_node_id(NodeId::new(21));
1254
1255 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1257
1258 assert_eq!(chunk.level_count(), 2);
1259 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1262
1263 #[test]
1264 fn test_flatten_single_level() {
1265 let flat = make_flat_chunk();
1266 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1267
1268 let flattened = factorized.flatten();
1269 assert_eq!(flattened.row_count(), 2);
1270 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1271 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1272 }
1273
1274 #[test]
1275 fn test_flatten_multi_level() {
1276 let mut sources = ValueVector::with_type(LogicalType::Int64);
1278 sources.push_int64(1);
1279 sources.push_int64(2);
1280
1281 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1282
1283 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1284 neighbors.push_int64(10);
1285 neighbors.push_int64(11);
1286 neighbors.push_int64(20);
1287 neighbors.push_int64(21);
1288
1289 let offsets = vec![0, 2, 4];
1290 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1291
1292 let flat = chunk.flatten();
1293 assert_eq!(flat.row_count(), 4);
1294 assert_eq!(flat.column_count(), 2);
1295
1296 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1299 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1300 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1301 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1302 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1303 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1304 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1305 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1306 }
1307
1308 #[test]
1309 fn test_logical_row_iter_single_level() {
1310 let flat = make_flat_chunk();
1311 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1312
1313 let indices: Vec<_> = factorized.logical_row_iter().collect();
1314 assert_eq!(indices.len(), 2);
1315 assert_eq!(indices[0], vec![0]);
1316 assert_eq!(indices[1], vec![1]);
1317 }
1318
1319 #[test]
1320 fn test_chunk_variant() {
1321 let flat = make_flat_chunk();
1322 let variant = ChunkVariant::flat(flat.clone());
1323
1324 assert!(variant.is_flat());
1325 assert!(!variant.is_factorized());
1326 assert_eq!(variant.logical_row_count(), 2);
1327
1328 let ensured = variant.ensure_flat();
1329 assert_eq!(ensured.row_count(), 2);
1330 }
1331
1332 #[test]
1333 fn test_chunk_variant_factorized() {
1334 let chunk = create_multi_level_chunk();
1335 let variant = ChunkVariant::factorized(chunk);
1336
1337 assert!(variant.is_factorized());
1338 assert!(!variant.is_flat());
1339 assert_eq!(variant.logical_row_count(), 4);
1340
1341 let flat = variant.ensure_flat();
1342 assert_eq!(flat.row_count(), 4);
1343 }
1344
1345 #[test]
1346 fn test_chunk_variant_from() {
1347 let flat = make_flat_chunk();
1348 let variant: ChunkVariant = flat.into();
1349 assert!(variant.is_flat());
1350
1351 let factorized = create_multi_level_chunk();
1352 let variant2: ChunkVariant = factorized.into();
1353 assert!(variant2.is_factorized());
1354 }
1355
1356 #[test]
1357 fn test_chunk_variant_is_empty() {
1358 let empty_flat = DataChunk::empty();
1359 let variant = ChunkVariant::flat(empty_flat);
1360 assert!(variant.is_empty());
1361
1362 let non_empty = make_flat_chunk();
1363 let variant2 = ChunkVariant::flat(non_empty);
1364 assert!(!variant2.is_empty());
1365 }
1366
1367 #[test]
1368 fn test_empty_chunk() {
1369 let chunk = FactorizedChunk::empty();
1370 assert_eq!(chunk.level_count(), 0);
1371 assert_eq!(chunk.logical_row_count(), 0);
1372 assert_eq!(chunk.physical_size(), 0);
1373
1374 let flat = chunk.flatten();
1375 assert!(flat.is_empty());
1376 }
1377
1378 #[test]
1379 fn test_all_column_names() {
1380 let mut sources = ValueVector::with_type(LogicalType::Int64);
1381 sources.push_int64(1);
1382
1383 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1384
1385 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1386 neighbors.push_int64(10);
1387
1388 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1389
1390 let names = chunk.all_column_names();
1391 assert_eq!(names, vec!["source", "neighbor"]);
1392 }
1393
1394 #[test]
1395 fn test_level_mut() {
1396 let mut chunk = create_multi_level_chunk();
1397
1398 let level = chunk.level_mut(0).unwrap();
1400 assert_eq!(level.column_count(), 1);
1401
1402 assert!(chunk.level_mut(10).is_none());
1404 }
1405
1406 #[test]
1407 fn test_factorization_level_column_mut() {
1408 let mut chunk = create_multi_level_chunk();
1409
1410 let level = chunk.level_mut(0).unwrap();
1411 let col = level.column_mut(0);
1412 assert!(col.is_some());
1413
1414 assert!(level.column_mut(10).is_none());
1416 }
1417
1418 #[test]
1419 fn test_factorization_level_physical_value_count() {
1420 let chunk = create_multi_level_chunk();
1421
1422 let level0 = chunk.level(0).unwrap();
1423 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1426 assert_eq!(level1.physical_value_count(), 4); }
1428
1429 #[test]
1430 fn test_count_rows() {
1431 let chunk = create_multi_level_chunk();
1432 assert_eq!(chunk.count_rows(), 4);
1433
1434 let empty = FactorizedChunk::empty();
1435 assert_eq!(empty.count_rows(), 0);
1436 }
1437
1438 #[test]
1439 fn test_compute_path_multiplicities() {
1440 let chunk = create_multi_level_chunk();
1441
1442 let mults = chunk.compute_path_multiplicities();
1443 assert_eq!(mults.len(), 4);
1445 assert!(mults.iter().all(|&m| m == 1));
1446 }
1447
1448 #[test]
1449 fn test_compute_path_multiplicities_single_level() {
1450 let mut col = ValueVector::with_type(LogicalType::Int64);
1451 col.push_int64(1);
1452 col.push_int64(2);
1453 col.push_int64(3);
1454
1455 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1456 let mults = chunk.compute_path_multiplicities();
1457
1458 assert_eq!(mults.len(), 3);
1460 assert!(mults.iter().all(|&m| m == 1));
1461 }
1462
1463 #[test]
1464 fn test_compute_path_multiplicities_empty() {
1465 let chunk = FactorizedChunk::empty();
1466 let mults = chunk.compute_path_multiplicities();
1467 assert!(mults.is_empty());
1468 }
1469
1470 #[test]
1471 fn test_path_multiplicities_cached() {
1472 let mut chunk = create_multi_level_chunk();
1473
1474 let mults1 = chunk.path_multiplicities_cached();
1476 assert_eq!(mults1.len(), 4);
1477
1478 let mults2 = chunk.path_multiplicities_cached();
1480 assert_eq!(mults1.len(), mults2.len());
1481 }
1482
1483 #[test]
1484 fn test_sum_deepest() {
1485 let chunk = create_multi_level_chunk();
1486
1487 let sum = chunk.sum_deepest(0);
1489 assert_eq!(sum, Some(10.0)); }
1491
1492 #[test]
1493 fn test_sum_deepest_empty() {
1494 let chunk = FactorizedChunk::empty();
1495 assert!(chunk.sum_deepest(0).is_none());
1496 }
1497
1498 #[test]
1499 fn test_sum_deepest_invalid_column() {
1500 let chunk = create_multi_level_chunk();
1501 assert!(chunk.sum_deepest(10).is_none());
1502 }
1503
1504 #[test]
1505 fn test_avg_deepest() {
1506 let chunk = create_multi_level_chunk();
1507
1508 let avg = chunk.avg_deepest(0);
1510 assert_eq!(avg, Some(2.5));
1511 }
1512
1513 #[test]
1514 fn test_avg_deepest_empty() {
1515 let chunk = FactorizedChunk::empty();
1516 assert!(chunk.avg_deepest(0).is_none());
1517 }
1518
1519 #[test]
1520 fn test_min_deepest() {
1521 let chunk = create_multi_level_chunk();
1522
1523 let min = chunk.min_deepest(0);
1524 assert_eq!(min, Some(Value::Int64(1)));
1525 }
1526
1527 #[test]
1528 fn test_min_deepest_empty() {
1529 let chunk = FactorizedChunk::empty();
1530 assert!(chunk.min_deepest(0).is_none());
1531 }
1532
1533 #[test]
1534 fn test_min_deepest_invalid_column() {
1535 let chunk = create_multi_level_chunk();
1536 assert!(chunk.min_deepest(10).is_none());
1537 }
1538
1539 #[test]
1540 fn test_max_deepest() {
1541 let chunk = create_multi_level_chunk();
1542
1543 let max = chunk.max_deepest(0);
1544 assert_eq!(max, Some(Value::Int64(4)));
1545 }
1546
1547 #[test]
1548 fn test_max_deepest_empty() {
1549 let chunk = FactorizedChunk::empty();
1550 assert!(chunk.max_deepest(0).is_none());
1551 }
1552
1553 #[test]
1554 fn test_value_less_than() {
1555 assert!(FactorizedChunk::value_less_than(
1557 &Value::Null,
1558 &Value::Int64(1)
1559 ));
1560 assert!(!FactorizedChunk::value_less_than(
1561 &Value::Int64(1),
1562 &Value::Null
1563 ));
1564 assert!(!FactorizedChunk::value_less_than(
1565 &Value::Null,
1566 &Value::Null
1567 ));
1568
1569 assert!(FactorizedChunk::value_less_than(
1571 &Value::Int64(1),
1572 &Value::Int64(2)
1573 ));
1574 assert!(!FactorizedChunk::value_less_than(
1575 &Value::Int64(2),
1576 &Value::Int64(1)
1577 ));
1578
1579 assert!(FactorizedChunk::value_less_than(
1581 &Value::Float64(1.5),
1582 &Value::Float64(2.5)
1583 ));
1584
1585 assert!(FactorizedChunk::value_less_than(
1587 &Value::Int64(1),
1588 &Value::Float64(1.5)
1589 ));
1590 assert!(FactorizedChunk::value_less_than(
1591 &Value::Float64(0.5),
1592 &Value::Int64(1)
1593 ));
1594
1595 assert!(FactorizedChunk::value_less_than(
1597 &Value::String("apple".into()),
1598 &Value::String("banana".into())
1599 ));
1600
1601 assert!(FactorizedChunk::value_less_than(
1603 &Value::Bool(false),
1604 &Value::Bool(true)
1605 ));
1606 assert!(!FactorizedChunk::value_less_than(
1607 &Value::Bool(true),
1608 &Value::Bool(false)
1609 ));
1610
1611 assert!(!FactorizedChunk::value_less_than(
1613 &Value::Int64(1),
1614 &Value::String("hello".into())
1615 ));
1616 }
1617
1618 #[test]
1619 fn test_filter_deepest() {
1620 let chunk = create_multi_level_chunk();
1621
1622 let filtered = chunk.filter_deepest(0, |v| {
1624 if let Value::Int64(n) = v {
1625 *n > 2
1626 } else {
1627 false
1628 }
1629 });
1630
1631 let filtered = filtered.unwrap();
1632 assert_eq!(filtered.logical_row_count(), 2); }
1634
1635 #[test]
1636 fn test_filter_deepest_empty() {
1637 let chunk = FactorizedChunk::empty();
1638 assert!(chunk.filter_deepest(0, |_| true).is_none());
1639 }
1640
1641 #[test]
1642 fn test_filter_deepest_all_filtered() {
1643 let chunk = create_multi_level_chunk();
1644
1645 let filtered = chunk.filter_deepest(0, |_| false);
1647
1648 let filtered = filtered.unwrap();
1649 assert_eq!(filtered.logical_row_count(), 0);
1650 }
1651
1652 #[test]
1653 fn test_filter_deepest_invalid_column() {
1654 let chunk = create_multi_level_chunk();
1655 assert!(chunk.filter_deepest(10, |_| true).is_none());
1656 }
1657
1658 #[test]
1659 fn test_filter_deepest_multi() {
1660 let mut sources = ValueVector::with_type(LogicalType::Int64);
1662 sources.push_int64(1);
1663
1664 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1665
1666 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1667 col1.push_int64(10);
1668 col1.push_int64(20);
1669 col1.push_int64(30);
1670
1671 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1672 col2.push_int64(1);
1673 col2.push_int64(2);
1674 col2.push_int64(3);
1675
1676 let offsets = vec![0, 3];
1677 chunk.add_level(
1678 vec![col1, col2],
1679 vec!["a".to_string(), "b".to_string()],
1680 &offsets,
1681 );
1682
1683 let filtered = chunk.filter_deepest_multi(|values| {
1685 if values.len() == 2
1686 && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1687 {
1688 return *a + *b > 15;
1689 }
1690 false
1691 });
1692
1693 assert!(filtered.is_some());
1694 let filtered = filtered.unwrap();
1695 assert_eq!(filtered.logical_row_count(), 2); }
1697
1698 #[test]
1699 fn test_filter_deepest_multi_empty() {
1700 let chunk = FactorizedChunk::empty();
1701 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1702 }
1703
1704 #[test]
1705 fn test_filter_deepest_multi_no_columns() {
1706 let mut sources = ValueVector::with_type(LogicalType::Int64);
1708 sources.push_int64(1);
1709
1710 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1711
1712 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1714 chunk.add_factorized_level(empty_level);
1715
1716 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1717 }
1718
1719 #[test]
1720 fn test_project() {
1721 let mut sources = ValueVector::with_type(LogicalType::Int64);
1722 sources.push_int64(1);
1723 sources.push_int64(2);
1724
1725 let mut col2 = ValueVector::with_type(LogicalType::String);
1726 col2.push_string("a");
1727 col2.push_string("b");
1728
1729 let chunk = FactorizedChunk::with_flat_level(
1730 vec![sources, col2],
1731 vec!["num".to_string(), "str".to_string()],
1732 );
1733
1734 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1736
1737 assert_eq!(projected.total_column_count(), 1);
1738 let names = projected.all_column_names();
1739 assert_eq!(names, vec!["projected_num"]);
1740 }
1741
1742 #[test]
1743 fn test_project_empty() {
1744 let chunk = FactorizedChunk::empty();
1745 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1746 assert_eq!(projected.level_count(), 0);
1747 }
1748
1749 #[test]
1750 fn test_project_empty_specs() {
1751 let chunk = create_multi_level_chunk();
1752 let projected = chunk.project(&[]);
1753 assert_eq!(projected.level_count(), 0);
1754 }
1755
1756 #[test]
1757 fn test_project_invalid_level() {
1758 let chunk = create_multi_level_chunk();
1759
1760 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1762 assert_eq!(projected.level_count(), 0);
1763 }
1764
1765 #[test]
1766 fn test_project_multi_level() {
1767 let chunk = create_multi_level_chunk();
1768
1769 let projected =
1771 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1772
1773 assert_eq!(projected.level_count(), 2);
1774 assert_eq!(projected.total_column_count(), 2);
1775 }
1776
1777 #[test]
1778 fn test_total_column_count() {
1779 let chunk = create_multi_level_chunk();
1780 assert_eq!(chunk.total_column_count(), 2); }
1782
1783 #[test]
1784 fn test_chunk_state_access() {
1785 let mut chunk = create_multi_level_chunk();
1786
1787 let state = chunk.chunk_state();
1788 assert!(state.is_factorized());
1789
1790 let state_mut = chunk.chunk_state_mut();
1791 state_mut.invalidate_cache();
1792 }
1793
1794 #[test]
1795 fn test_logical_row_iter_multi_level() {
1796 let chunk = create_multi_level_chunk();
1797
1798 let indices: Vec<_> = chunk.logical_row_iter().collect();
1799 assert_eq!(indices.len(), 4);
1800
1801 assert_eq!(indices[0], vec![0, 0]);
1803 assert_eq!(indices[1], vec![0, 1]);
1804 assert_eq!(indices[2], vec![1, 2]);
1805 assert_eq!(indices[3], vec![1, 3]);
1806 }
1807
1808 #[test]
1809 fn test_sum_deepest_with_float() {
1810 let mut sources = ValueVector::with_type(LogicalType::Int64);
1811 sources.push_int64(1);
1812
1813 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1814
1815 let mut floats = ValueVector::with_type(LogicalType::Float64);
1816 floats.push_float64(1.5);
1817 floats.push_float64(2.5);
1818 floats.push_float64(3.0);
1819
1820 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1821
1822 let sum = chunk.sum_deepest(0);
1823 assert_eq!(sum, Some(7.0)); }
1825
1826 #[test]
1827 fn test_min_max_with_strings() {
1828 let mut sources = ValueVector::with_type(LogicalType::Int64);
1829 sources.push_int64(1);
1830
1831 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1832
1833 let mut strings = ValueVector::with_type(LogicalType::String);
1834 strings.push_string("banana");
1835 strings.push_string("apple");
1836 strings.push_string("cherry");
1837
1838 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1839
1840 let min = chunk.min_deepest(0);
1841 assert_eq!(min, Some(Value::String("apple".into())));
1842
1843 let max = chunk.max_deepest(0);
1844 assert_eq!(max, Some(Value::String("cherry".into())));
1845 }
1846
1847 #[test]
1848 fn test_recompute_logical_row_count_empty() {
1849 let mut chunk = FactorizedChunk::empty();
1850 chunk.recompute_logical_row_count();
1851 assert_eq!(chunk.logical_row_count(), 0);
1852 }
1853
1854 #[test]
1855 fn test_factorization_level_group_count() {
1856 let chunk = create_multi_level_chunk();
1857
1858 let level0 = chunk.level(0).unwrap();
1859 assert_eq!(level0.group_count(), 2);
1860
1861 let level1 = chunk.level(1).unwrap();
1862 assert_eq!(level1.group_count(), 4);
1863 }
1864
1865 #[test]
1866 fn test_factorization_level_multiplicities() {
1867 let chunk = create_multi_level_chunk();
1868
1869 let level1 = chunk.level(1).unwrap();
1870 let mults = level1.multiplicities();
1871 assert_eq!(mults, &[2, 2]); }
1873
1874 #[test]
1875 fn test_factorization_level_column_names() {
1876 let chunk = create_multi_level_chunk();
1877
1878 let level0 = chunk.level(0).unwrap();
1879 assert_eq!(level0.column_names(), &["src"]);
1880
1881 let level1 = chunk.level(1).unwrap();
1882 assert_eq!(level1.column_names(), &["nbr"]);
1883 }
1884}