1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
240 if let Some(cached) = self.state.cached_multiplicities() {
242 return Arc::clone(cached);
243 }
244
245 let mults = self.compute_path_multiplicities();
247 let arc_mults: Arc<[usize]> = mults.into();
248 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
249 arc_mults
250 }
251
252 #[must_use]
254 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
255 self.levels.get(index)
256 }
257
258 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
260 self.levels.get_mut(index)
261 }
262
263 pub fn add_level(
274 &mut self,
275 columns: Vec<ValueVector>,
276 column_names: Vec<String>,
277 offsets: &[u32],
278 ) {
279 let parent_count = offsets.len().saturating_sub(1);
280
281 let multiplicities: Vec<usize> = (0..parent_count)
283 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
284 .collect();
285
286 let factorized_columns: Vec<FactorizedVector> = columns
288 .into_iter()
289 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
290 .collect();
291
292 let level =
293 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
294 self.levels.push(level);
295
296 if self.levels.len() == 1 {
300 self.logical_row_count = multiplicities.iter().sum();
302 } else {
303 self.recompute_logical_row_count();
305 }
306
307 self.update_state();
309 }
310
311 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
313 self.levels.push(level);
314 self.recompute_logical_row_count();
315 self.update_state();
316 }
317
318 fn update_state(&mut self) {
320 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
321 }
322
323 fn recompute_logical_row_count(&mut self) {
325 if self.levels.is_empty() {
326 self.logical_row_count = 0;
327 return;
328 }
329
330 let level0_count = self.levels[0].group_count;
332 if self.levels.len() == 1 {
333 self.logical_row_count = level0_count;
334 return;
335 }
336
337 let mut counts = vec![1usize; level0_count];
340
341 for level_idx in 1..self.levels.len() {
342 let level = &self.levels[level_idx];
343 let mut new_counts = Vec::with_capacity(counts.len() * 2); for (parent_idx, &parent_count) in counts.iter().enumerate() {
346 if parent_idx < level.multiplicities.len() {
348 let child_mult = level.multiplicities[parent_idx];
349 for _ in 0..child_mult {
350 new_counts.push(parent_count);
351 }
352 }
353 }
354
355 counts = new_counts;
356 }
357
358 self.logical_row_count = counts.len();
359 }
360
361 #[must_use]
365 pub fn flatten(&self) -> DataChunk {
366 if self.levels.is_empty() {
367 return DataChunk::empty();
368 }
369
370 let mut all_columns: Vec<ValueVector> = Vec::new();
372
373 if self.levels.len() == 1 {
375 let level = &self.levels[0];
376 for col in &level.columns {
377 all_columns.push(col.flatten(None));
378 }
379 return DataChunk::new(all_columns);
380 }
381
382 let row_iter = self.logical_row_iter();
385 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
386
387 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
389 for level in &self.levels {
390 for col in &level.columns {
391 output_columns.push(ValueVector::with_capacity(
392 col.data_type(),
393 self.logical_row_count,
394 ));
395 }
396 }
397
398 for indices in row_iter {
400 let mut col_offset = 0;
401 for (level_idx, level) in self.levels.iter().enumerate() {
402 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
403 for (col_idx, col) in level.columns.iter().enumerate() {
404 if let Some(value) = col.get_physical(level_idx_value) {
405 output_columns[col_offset + col_idx].push_value(value);
406 }
407 }
408 col_offset += level.column_count();
409 }
410 }
411
412 DataChunk::new(output_columns)
413 }
414
415 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
419 FactorizedRowIterator::new(self)
420 }
421
422 #[must_use]
424 pub fn total_column_count(&self) -> usize {
425 self.levels.iter().map(|l| l.column_count()).sum()
426 }
427
428 #[must_use]
430 pub fn all_column_names(&self) -> Vec<String> {
431 self.levels
432 .iter()
433 .flat_map(|l| l.column_names.iter().cloned())
434 .collect()
435 }
436
437 #[must_use]
451 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
452 where
453 F: Fn(&grafeo_common::types::Value) -> bool,
454 {
455 if self.levels.is_empty() {
456 return None;
457 }
458
459 let deepest_idx = self.levels.len() - 1;
460 let deepest = &self.levels[deepest_idx];
461
462 let filter_col = deepest.column(column_idx)?;
464
465 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
467 .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
468 .collect();
469
470 let parent_count = filter_col.parent_count();
472 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
473 let mut new_offsets: Vec<u32> = vec![0];
474
475 for parent_idx in 0..parent_count {
477 let (start, end) = filter_col.range_for_parent(parent_idx);
478
479 for phys_idx in start..end {
480 if let Some(value) = filter_col.get_physical(phys_idx)
482 && predicate(&value)
483 {
484 for col_idx in 0..deepest.column_count() {
486 if let Some(col) = deepest.column(col_idx)
487 && let Some(v) = col.get_physical(phys_idx)
488 {
489 new_columns[col_idx].push_value(v);
490 }
491 }
492 new_multiplicities[parent_idx] += 1;
493 }
494 }
495
496 new_offsets.push(new_columns[0].len() as u32);
497 }
498
499 let total_remaining: usize = new_multiplicities.iter().sum();
501 if total_remaining == 0 {
502 return Some(Self::empty());
503 }
504
505 let new_factorized_cols: Vec<FactorizedVector> = new_columns
507 .into_iter()
508 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
509 .collect();
510
511 let new_level = FactorizationLevel::unflat(
512 new_factorized_cols,
513 deepest.column_names().to_vec(),
514 new_multiplicities,
515 );
516
517 let mut result = Self {
519 levels: self.levels[..deepest_idx].to_vec(),
520 logical_row_count: 0,
521 state: ChunkState::flat(0),
522 };
523 result.levels.push(new_level);
524 result.recompute_logical_row_count();
525 result.update_state();
526
527 Some(result)
528 }
529
530 #[must_use]
538 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
539 where
540 F: Fn(&[grafeo_common::types::Value]) -> bool,
541 {
542 if self.levels.is_empty() {
543 return None;
544 }
545
546 let deepest_idx = self.levels.len() - 1;
547 let deepest = &self.levels[deepest_idx];
548 let col_count = deepest.column_count();
549
550 if col_count == 0 {
551 return None;
552 }
553
554 let first_col = deepest.column(0)?;
555 let parent_count = first_col.parent_count();
556
557 let mut new_columns: Vec<ValueVector> = (0..col_count)
559 .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
560 .collect();
561
562 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
563 let mut new_offsets: Vec<u32> = vec![0];
564 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
565
566 for parent_idx in 0..parent_count {
567 let (start, end) = first_col.range_for_parent(parent_idx);
568
569 for phys_idx in start..end {
570 row_values.clear();
572 for col_idx in 0..col_count {
573 if let Some(col) = deepest.column(col_idx)
574 && let Some(v) = col.get_physical(phys_idx)
575 {
576 row_values.push(v);
577 }
578 }
579
580 if predicate(&row_values) {
582 for (col_idx, v) in row_values.iter().enumerate() {
583 new_columns[col_idx].push_value(v.clone());
584 }
585 new_multiplicities[parent_idx] += 1;
586 }
587 }
588
589 new_offsets.push(new_columns[0].len() as u32);
590 }
591
592 let total: usize = new_multiplicities.iter().sum();
594 if total == 0 {
595 return Some(Self::empty());
596 }
597
598 let new_factorized_cols: Vec<FactorizedVector> = new_columns
600 .into_iter()
601 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
602 .collect();
603
604 let new_level = FactorizationLevel::unflat(
605 new_factorized_cols,
606 deepest.column_names().to_vec(),
607 new_multiplicities,
608 );
609
610 let mut result = Self {
611 levels: self.levels[..deepest_idx].to_vec(),
612 logical_row_count: 0,
613 state: ChunkState::flat(0),
614 };
615 result.levels.push(new_level);
616 result.recompute_logical_row_count();
617 result.update_state();
618
619 Some(result)
620 }
621
622 #[must_use]
642 pub fn count_rows(&self) -> usize {
643 self.logical_row_count()
644 }
645
646 #[must_use]
663 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
664 if self.levels.is_empty() {
665 return Vec::new();
666 }
667
668 if self.levels.len() == 1 {
670 return vec![1; self.levels[0].group_count];
671 }
672
673 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
675
676 for level_idx in 1..self.levels.len() {
678 let level = &self.levels[level_idx];
679 let mut child_multiplicities = Vec::with_capacity(level.group_count);
680
681 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
683 let child_count = if parent_idx < level.multiplicities.len() {
684 level.multiplicities[parent_idx]
685 } else {
686 0
687 };
688
689 for _ in 0..child_count {
691 child_multiplicities.push(parent_mult);
692 }
693 }
694
695 parent_multiplicities = child_multiplicities;
696 }
697
698 parent_multiplicities
699 }
700
701 #[must_use]
714 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
715 if self.levels.is_empty() {
716 return None;
717 }
718
719 let deepest_idx = self.levels.len() - 1;
720 let deepest = &self.levels[deepest_idx];
721 let col = deepest.column(column_idx)?;
722
723 let multiplicities = self.compute_path_multiplicities();
725
726 let mut sum = 0.0;
727 for (phys_idx, mult) in multiplicities.iter().enumerate() {
728 if let Some(value) = col.get_physical(phys_idx) {
729 let num_value = match &value {
731 grafeo_common::types::Value::Int64(v) => *v as f64,
732 grafeo_common::types::Value::Float64(v) => *v,
733 _ => continue, };
735 sum += num_value * (*mult as f64);
736 }
737 }
738 Some(sum)
739 }
740
741 #[must_use]
753 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
754 let count = self.logical_row_count();
755 if count == 0 {
756 return None;
757 }
758
759 let sum = self.sum_deepest(column_idx)?;
760 Some(sum / count as f64)
761 }
762
763 #[must_use]
776 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
777 if self.levels.is_empty() {
778 return None;
779 }
780
781 let deepest_idx = self.levels.len() - 1;
782 let deepest = &self.levels[deepest_idx];
783 let col = deepest.column(column_idx)?;
784
785 let mut min_value: Option<grafeo_common::types::Value> = None;
786
787 for phys_idx in 0..col.physical_len() {
788 if let Some(value) = col.get_physical(phys_idx) {
789 min_value = Some(match min_value {
790 None => value,
791 Some(current) => {
792 if Self::value_less_than(&value, ¤t) {
793 value
794 } else {
795 current
796 }
797 }
798 });
799 }
800 }
801
802 min_value
803 }
804
805 #[must_use]
818 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
819 if self.levels.is_empty() {
820 return None;
821 }
822
823 let deepest_idx = self.levels.len() - 1;
824 let deepest = &self.levels[deepest_idx];
825 let col = deepest.column(column_idx)?;
826
827 let mut max_value: Option<grafeo_common::types::Value> = None;
828
829 for phys_idx in 0..col.physical_len() {
830 if let Some(value) = col.get_physical(phys_idx) {
831 max_value = Some(match max_value {
832 None => value,
833 Some(current) => {
834 if Self::value_less_than(¤t, &value) {
835 value
836 } else {
837 current
838 }
839 }
840 });
841 }
842 }
843
844 max_value
845 }
846
847 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
855 use grafeo_common::types::Value;
856
857 match (a, b) {
858 (Value::Null, Value::Null) => false,
860 (Value::Null, _) => true,
861 (_, Value::Null) => false,
862
863 (Value::Int64(x), Value::Int64(y)) => x < y,
865 (Value::Float64(x), Value::Float64(y)) => x < y,
866 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
867 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
868
869 (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
871
872 (Value::Bool(x), Value::Bool(y)) => !x && *y,
874
875 _ => false,
878 }
879 }
880
881 #[must_use]
895 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
896 if self.levels.is_empty() || column_specs.is_empty() {
897 return Self::empty();
898 }
899
900 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
902 for (level_idx, col_idx, name) in column_specs {
903 if *level_idx < self.levels.len() {
904 level_specs[*level_idx].push((*col_idx, name.clone()));
905 }
906 }
907
908 let mut new_levels = Vec::new();
910
911 for (level_idx, specs) in level_specs.iter().enumerate() {
912 if specs.is_empty() {
913 continue;
914 }
915
916 let src_level = &self.levels[level_idx];
917
918 let columns: Vec<FactorizedVector> = specs
919 .iter()
920 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
921 .collect();
922
923 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
924
925 if level_idx == 0 {
926 new_levels.push(FactorizationLevel::flat(columns, names));
927 } else {
928 let mults = src_level.multiplicities().to_vec();
929 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
930 }
931 }
932
933 if new_levels.is_empty() {
934 return Self::empty();
935 }
936
937 let mut result = Self {
938 levels: new_levels,
939 logical_row_count: 0,
940 state: ChunkState::flat(0),
941 };
942 result.recompute_logical_row_count();
943 result.update_state();
944 result
945 }
946}
947
948pub struct FactorizedRowIterator<'a> {
963 chunk: &'a FactorizedChunk,
964 indices: Vec<usize>,
966 exhausted: bool,
969}
970
971impl<'a> FactorizedRowIterator<'a> {
972 fn new(chunk: &'a FactorizedChunk) -> Self {
973 let indices = vec![0; chunk.level_count()];
974 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
975
976 let mut iter = Self {
977 chunk,
978 indices,
979 exhausted,
980 };
981
982 if !exhausted && !iter.has_valid_deepest_range() {
984 if !iter.advance() {
985 exhausted = true;
986 }
987 iter.exhausted = exhausted;
988 }
989
990 iter
991 }
992
993 fn advance(&mut self) -> bool {
995 if self.exhausted || self.chunk.levels.is_empty() {
996 return false;
997 }
998
999 for level_idx in (0..self.chunk.levels.len()).rev() {
1001 let level = &self.chunk.levels[level_idx];
1002
1003 let parent_idx = if level_idx == 0 {
1005 self.indices[0] + 1
1007 } else {
1008 self.indices[level_idx - 1]
1010 };
1011
1012 let (_start, end) = if level_idx == 0 {
1014 (0, level.group_count)
1015 } else {
1016 if let Some(col) = level.columns.first() {
1018 col.range_for_parent(parent_idx)
1019 } else {
1020 (0, 0)
1021 }
1022 };
1023
1024 let current = self.indices[level_idx];
1025 if current + 1 < end {
1026 self.indices[level_idx] = current + 1;
1028 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1030 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1031 let (deeper_start, _) =
1032 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1033 self.indices[deeper_idx] = deeper_start;
1034 }
1035 }
1036
1037 if self.has_valid_deepest_range() {
1040 return true;
1041 }
1042 return self.advance();
1045 }
1046 }
1048
1049 self.exhausted = true;
1051 false
1052 }
1053
1054 fn has_valid_deepest_range(&self) -> bool {
1060 if self.chunk.levels.len() <= 1 {
1061 return true; }
1063
1064 for level_idx in 1..self.chunk.levels.len() {
1066 let parent_idx = self.indices[level_idx - 1];
1067 if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1068 let (start, end) = col.range_for_parent(parent_idx);
1069 if start >= end {
1070 return false;
1071 }
1072 } else {
1073 return false;
1074 }
1075 }
1076
1077 true
1078 }
1079}
1080
1081impl Iterator for FactorizedRowIterator<'_> {
1082 type Item = Vec<usize>;
1083
1084 fn next(&mut self) -> Option<Self::Item> {
1085 if self.exhausted {
1086 return None;
1087 }
1088
1089 let result = self.indices.clone();
1091 self.advance();
1092 Some(result)
1093 }
1094}
1095
1096#[derive(Debug, Clone)]
1098pub enum ChunkVariant {
1099 Flat(DataChunk),
1101 Factorized(FactorizedChunk),
1103}
1104
1105impl ChunkVariant {
1106 #[must_use]
1108 pub fn flat(chunk: DataChunk) -> Self {
1109 Self::Flat(chunk)
1110 }
1111
1112 #[must_use]
1114 pub fn factorized(chunk: FactorizedChunk) -> Self {
1115 Self::Factorized(chunk)
1116 }
1117
1118 #[must_use]
1120 pub fn ensure_flat(self) -> DataChunk {
1121 match self {
1122 Self::Flat(chunk) => chunk,
1123 Self::Factorized(chunk) => chunk.flatten(),
1124 }
1125 }
1126
1127 #[must_use]
1129 pub fn logical_row_count(&self) -> usize {
1130 match self {
1131 Self::Flat(chunk) => chunk.row_count(),
1132 Self::Factorized(chunk) => chunk.logical_row_count(),
1133 }
1134 }
1135
1136 #[must_use]
1138 pub fn is_factorized(&self) -> bool {
1139 matches!(self, Self::Factorized(_))
1140 }
1141
1142 #[must_use]
1144 pub fn is_flat(&self) -> bool {
1145 matches!(self, Self::Flat(_))
1146 }
1147
1148 #[must_use]
1150 pub fn is_empty(&self) -> bool {
1151 self.logical_row_count() == 0
1152 }
1153}
1154
1155impl From<DataChunk> for ChunkVariant {
1156 fn from(chunk: DataChunk) -> Self {
1157 Self::Flat(chunk)
1158 }
1159}
1160
1161impl From<FactorizedChunk> for ChunkVariant {
1162 fn from(chunk: FactorizedChunk) -> Self {
1163 Self::Factorized(chunk)
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use grafeo_common::types::{LogicalType, NodeId, Value};
1170
1171 use super::*;
1172
1173 fn make_flat_chunk() -> DataChunk {
1174 let mut col = ValueVector::with_type(LogicalType::Int64);
1175 col.push_int64(1);
1176 col.push_int64(2);
1177 DataChunk::new(vec![col])
1178 }
1179
1180 fn create_multi_level_chunk() -> FactorizedChunk {
1181 let mut sources = ValueVector::with_type(LogicalType::Int64);
1183 sources.push_int64(10);
1184 sources.push_int64(20);
1185
1186 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1187
1188 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1189 neighbors.push_int64(1);
1190 neighbors.push_int64(2);
1191 neighbors.push_int64(3);
1192 neighbors.push_int64(4);
1193
1194 let offsets = vec![0, 2, 4];
1195 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1196 chunk
1197 }
1198
1199 #[test]
1200 fn test_from_flat() {
1201 let flat = make_flat_chunk();
1202 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1203
1204 assert_eq!(factorized.level_count(), 1);
1205 assert_eq!(factorized.logical_row_count(), 2);
1206 assert_eq!(factorized.physical_size(), 2);
1207 }
1208
1209 #[test]
1210 fn test_add_level() {
1211 let mut col0 = ValueVector::with_type(LogicalType::Node);
1213 col0.push_node_id(NodeId::new(100));
1214 col0.push_node_id(NodeId::new(200));
1215
1216 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1217
1218 assert_eq!(chunk.level_count(), 1);
1219 assert_eq!(chunk.logical_row_count(), 2);
1220
1221 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1223 neighbors.push_node_id(NodeId::new(10));
1224 neighbors.push_node_id(NodeId::new(11));
1225 neighbors.push_node_id(NodeId::new(12));
1226 neighbors.push_node_id(NodeId::new(20));
1227 neighbors.push_node_id(NodeId::new(21));
1228
1229 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1231
1232 assert_eq!(chunk.level_count(), 2);
1233 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1236
1237 #[test]
1238 fn test_flatten_single_level() {
1239 let flat = make_flat_chunk();
1240 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1241
1242 let flattened = factorized.flatten();
1243 assert_eq!(flattened.row_count(), 2);
1244 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1245 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1246 }
1247
1248 #[test]
1249 fn test_flatten_multi_level() {
1250 let mut sources = ValueVector::with_type(LogicalType::Int64);
1252 sources.push_int64(1);
1253 sources.push_int64(2);
1254
1255 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1256
1257 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1258 neighbors.push_int64(10);
1259 neighbors.push_int64(11);
1260 neighbors.push_int64(20);
1261 neighbors.push_int64(21);
1262
1263 let offsets = vec![0, 2, 4];
1264 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1265
1266 let flat = chunk.flatten();
1267 assert_eq!(flat.row_count(), 4);
1268 assert_eq!(flat.column_count(), 2);
1269
1270 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1273 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1274 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1275 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1276 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1277 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1278 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1279 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1280 }
1281
1282 #[test]
1283 fn test_logical_row_iter_single_level() {
1284 let flat = make_flat_chunk();
1285 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1286
1287 let indices: Vec<_> = factorized.logical_row_iter().collect();
1288 assert_eq!(indices.len(), 2);
1289 assert_eq!(indices[0], vec![0]);
1290 assert_eq!(indices[1], vec![1]);
1291 }
1292
1293 #[test]
1294 fn test_chunk_variant() {
1295 let flat = make_flat_chunk();
1296 let variant = ChunkVariant::flat(flat.clone());
1297
1298 assert!(variant.is_flat());
1299 assert!(!variant.is_factorized());
1300 assert_eq!(variant.logical_row_count(), 2);
1301
1302 let ensured = variant.ensure_flat();
1303 assert_eq!(ensured.row_count(), 2);
1304 }
1305
1306 #[test]
1307 fn test_chunk_variant_factorized() {
1308 let chunk = create_multi_level_chunk();
1309 let variant = ChunkVariant::factorized(chunk);
1310
1311 assert!(variant.is_factorized());
1312 assert!(!variant.is_flat());
1313 assert_eq!(variant.logical_row_count(), 4);
1314
1315 let flat = variant.ensure_flat();
1316 assert_eq!(flat.row_count(), 4);
1317 }
1318
1319 #[test]
1320 fn test_chunk_variant_from() {
1321 let flat = make_flat_chunk();
1322 let variant: ChunkVariant = flat.into();
1323 assert!(variant.is_flat());
1324
1325 let factorized = create_multi_level_chunk();
1326 let variant2: ChunkVariant = factorized.into();
1327 assert!(variant2.is_factorized());
1328 }
1329
1330 #[test]
1331 fn test_chunk_variant_is_empty() {
1332 let empty_flat = DataChunk::empty();
1333 let variant = ChunkVariant::flat(empty_flat);
1334 assert!(variant.is_empty());
1335
1336 let non_empty = make_flat_chunk();
1337 let variant2 = ChunkVariant::flat(non_empty);
1338 assert!(!variant2.is_empty());
1339 }
1340
1341 #[test]
1342 fn test_empty_chunk() {
1343 let chunk = FactorizedChunk::empty();
1344 assert_eq!(chunk.level_count(), 0);
1345 assert_eq!(chunk.logical_row_count(), 0);
1346 assert_eq!(chunk.physical_size(), 0);
1347
1348 let flat = chunk.flatten();
1349 assert!(flat.is_empty());
1350 }
1351
1352 #[test]
1353 fn test_all_column_names() {
1354 let mut sources = ValueVector::with_type(LogicalType::Int64);
1355 sources.push_int64(1);
1356
1357 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1358
1359 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1360 neighbors.push_int64(10);
1361
1362 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1363
1364 let names = chunk.all_column_names();
1365 assert_eq!(names, vec!["source", "neighbor"]);
1366 }
1367
1368 #[test]
1369 fn test_level_mut() {
1370 let mut chunk = create_multi_level_chunk();
1371
1372 let level = chunk.level_mut(0).unwrap();
1374 assert_eq!(level.column_count(), 1);
1375
1376 assert!(chunk.level_mut(10).is_none());
1378 }
1379
1380 #[test]
1381 fn test_factorization_level_column_mut() {
1382 let mut chunk = create_multi_level_chunk();
1383
1384 let level = chunk.level_mut(0).unwrap();
1385 let col = level.column_mut(0);
1386 assert!(col.is_some());
1387
1388 assert!(level.column_mut(10).is_none());
1390 }
1391
1392 #[test]
1393 fn test_factorization_level_physical_value_count() {
1394 let chunk = create_multi_level_chunk();
1395
1396 let level0 = chunk.level(0).unwrap();
1397 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1400 assert_eq!(level1.physical_value_count(), 4); }
1402
1403 #[test]
1404 fn test_count_rows() {
1405 let chunk = create_multi_level_chunk();
1406 assert_eq!(chunk.count_rows(), 4);
1407
1408 let empty = FactorizedChunk::empty();
1409 assert_eq!(empty.count_rows(), 0);
1410 }
1411
1412 #[test]
1413 fn test_compute_path_multiplicities() {
1414 let chunk = create_multi_level_chunk();
1415
1416 let mults = chunk.compute_path_multiplicities();
1417 assert_eq!(mults.len(), 4);
1419 assert!(mults.iter().all(|&m| m == 1));
1420 }
1421
1422 #[test]
1423 fn test_compute_path_multiplicities_single_level() {
1424 let mut col = ValueVector::with_type(LogicalType::Int64);
1425 col.push_int64(1);
1426 col.push_int64(2);
1427 col.push_int64(3);
1428
1429 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1430 let mults = chunk.compute_path_multiplicities();
1431
1432 assert_eq!(mults.len(), 3);
1434 assert!(mults.iter().all(|&m| m == 1));
1435 }
1436
1437 #[test]
1438 fn test_compute_path_multiplicities_empty() {
1439 let chunk = FactorizedChunk::empty();
1440 let mults = chunk.compute_path_multiplicities();
1441 assert!(mults.is_empty());
1442 }
1443
1444 #[test]
1445 fn test_path_multiplicities_cached() {
1446 let mut chunk = create_multi_level_chunk();
1447
1448 let mults1 = chunk.path_multiplicities_cached();
1450 assert_eq!(mults1.len(), 4);
1451
1452 let mults2 = chunk.path_multiplicities_cached();
1454 assert_eq!(mults1.len(), mults2.len());
1455 }
1456
1457 #[test]
1458 fn test_sum_deepest() {
1459 let chunk = create_multi_level_chunk();
1460
1461 let sum = chunk.sum_deepest(0);
1463 assert_eq!(sum, Some(10.0)); }
1465
1466 #[test]
1467 fn test_sum_deepest_empty() {
1468 let chunk = FactorizedChunk::empty();
1469 assert!(chunk.sum_deepest(0).is_none());
1470 }
1471
1472 #[test]
1473 fn test_sum_deepest_invalid_column() {
1474 let chunk = create_multi_level_chunk();
1475 assert!(chunk.sum_deepest(10).is_none());
1476 }
1477
1478 #[test]
1479 fn test_avg_deepest() {
1480 let chunk = create_multi_level_chunk();
1481
1482 let avg = chunk.avg_deepest(0);
1484 assert_eq!(avg, Some(2.5));
1485 }
1486
1487 #[test]
1488 fn test_avg_deepest_empty() {
1489 let chunk = FactorizedChunk::empty();
1490 assert!(chunk.avg_deepest(0).is_none());
1491 }
1492
1493 #[test]
1494 fn test_min_deepest() {
1495 let chunk = create_multi_level_chunk();
1496
1497 let min = chunk.min_deepest(0);
1498 assert_eq!(min, Some(Value::Int64(1)));
1499 }
1500
1501 #[test]
1502 fn test_min_deepest_empty() {
1503 let chunk = FactorizedChunk::empty();
1504 assert!(chunk.min_deepest(0).is_none());
1505 }
1506
1507 #[test]
1508 fn test_min_deepest_invalid_column() {
1509 let chunk = create_multi_level_chunk();
1510 assert!(chunk.min_deepest(10).is_none());
1511 }
1512
1513 #[test]
1514 fn test_max_deepest() {
1515 let chunk = create_multi_level_chunk();
1516
1517 let max = chunk.max_deepest(0);
1518 assert_eq!(max, Some(Value::Int64(4)));
1519 }
1520
1521 #[test]
1522 fn test_max_deepest_empty() {
1523 let chunk = FactorizedChunk::empty();
1524 assert!(chunk.max_deepest(0).is_none());
1525 }
1526
1527 #[test]
1528 fn test_value_less_than() {
1529 assert!(FactorizedChunk::value_less_than(
1531 &Value::Null,
1532 &Value::Int64(1)
1533 ));
1534 assert!(!FactorizedChunk::value_less_than(
1535 &Value::Int64(1),
1536 &Value::Null
1537 ));
1538 assert!(!FactorizedChunk::value_less_than(
1539 &Value::Null,
1540 &Value::Null
1541 ));
1542
1543 assert!(FactorizedChunk::value_less_than(
1545 &Value::Int64(1),
1546 &Value::Int64(2)
1547 ));
1548 assert!(!FactorizedChunk::value_less_than(
1549 &Value::Int64(2),
1550 &Value::Int64(1)
1551 ));
1552
1553 assert!(FactorizedChunk::value_less_than(
1555 &Value::Float64(1.5),
1556 &Value::Float64(2.5)
1557 ));
1558
1559 assert!(FactorizedChunk::value_less_than(
1561 &Value::Int64(1),
1562 &Value::Float64(1.5)
1563 ));
1564 assert!(FactorizedChunk::value_less_than(
1565 &Value::Float64(0.5),
1566 &Value::Int64(1)
1567 ));
1568
1569 assert!(FactorizedChunk::value_less_than(
1571 &Value::String("apple".into()),
1572 &Value::String("banana".into())
1573 ));
1574
1575 assert!(FactorizedChunk::value_less_than(
1577 &Value::Bool(false),
1578 &Value::Bool(true)
1579 ));
1580 assert!(!FactorizedChunk::value_less_than(
1581 &Value::Bool(true),
1582 &Value::Bool(false)
1583 ));
1584
1585 assert!(!FactorizedChunk::value_less_than(
1587 &Value::Int64(1),
1588 &Value::String("hello".into())
1589 ));
1590 }
1591
1592 #[test]
1593 fn test_filter_deepest() {
1594 let chunk = create_multi_level_chunk();
1595
1596 let filtered = chunk.filter_deepest(0, |v| {
1598 if let Value::Int64(n) = v {
1599 *n > 2
1600 } else {
1601 false
1602 }
1603 });
1604
1605 let filtered = filtered.unwrap();
1606 assert_eq!(filtered.logical_row_count(), 2); }
1608
1609 #[test]
1610 fn test_filter_deepest_empty() {
1611 let chunk = FactorizedChunk::empty();
1612 assert!(chunk.filter_deepest(0, |_| true).is_none());
1613 }
1614
1615 #[test]
1616 fn test_filter_deepest_all_filtered() {
1617 let chunk = create_multi_level_chunk();
1618
1619 let filtered = chunk.filter_deepest(0, |_| false);
1621
1622 let filtered = filtered.unwrap();
1623 assert_eq!(filtered.logical_row_count(), 0);
1624 }
1625
1626 #[test]
1627 fn test_filter_deepest_invalid_column() {
1628 let chunk = create_multi_level_chunk();
1629 assert!(chunk.filter_deepest(10, |_| true).is_none());
1630 }
1631
1632 #[test]
1633 fn test_filter_deepest_multi() {
1634 let mut sources = ValueVector::with_type(LogicalType::Int64);
1636 sources.push_int64(1);
1637
1638 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1639
1640 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1641 col1.push_int64(10);
1642 col1.push_int64(20);
1643 col1.push_int64(30);
1644
1645 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1646 col2.push_int64(1);
1647 col2.push_int64(2);
1648 col2.push_int64(3);
1649
1650 let offsets = vec![0, 3];
1651 chunk.add_level(
1652 vec![col1, col2],
1653 vec!["a".to_string(), "b".to_string()],
1654 &offsets,
1655 );
1656
1657 let filtered = chunk.filter_deepest_multi(|values| {
1659 if values.len() == 2
1660 && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1661 {
1662 return *a + *b > 15;
1663 }
1664 false
1665 });
1666
1667 assert!(filtered.is_some());
1668 let filtered = filtered.unwrap();
1669 assert_eq!(filtered.logical_row_count(), 2); }
1671
1672 #[test]
1673 fn test_filter_deepest_multi_empty() {
1674 let chunk = FactorizedChunk::empty();
1675 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1676 }
1677
1678 #[test]
1679 fn test_filter_deepest_multi_no_columns() {
1680 let mut sources = ValueVector::with_type(LogicalType::Int64);
1682 sources.push_int64(1);
1683
1684 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1685
1686 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1688 chunk.add_factorized_level(empty_level);
1689
1690 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1691 }
1692
1693 #[test]
1694 fn test_project() {
1695 let mut sources = ValueVector::with_type(LogicalType::Int64);
1696 sources.push_int64(1);
1697 sources.push_int64(2);
1698
1699 let mut col2 = ValueVector::with_type(LogicalType::String);
1700 col2.push_string("a");
1701 col2.push_string("b");
1702
1703 let chunk = FactorizedChunk::with_flat_level(
1704 vec![sources, col2],
1705 vec!["num".to_string(), "str".to_string()],
1706 );
1707
1708 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1710
1711 assert_eq!(projected.total_column_count(), 1);
1712 let names = projected.all_column_names();
1713 assert_eq!(names, vec!["projected_num"]);
1714 }
1715
1716 #[test]
1717 fn test_project_empty() {
1718 let chunk = FactorizedChunk::empty();
1719 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1720 assert_eq!(projected.level_count(), 0);
1721 }
1722
1723 #[test]
1724 fn test_project_empty_specs() {
1725 let chunk = create_multi_level_chunk();
1726 let projected = chunk.project(&[]);
1727 assert_eq!(projected.level_count(), 0);
1728 }
1729
1730 #[test]
1731 fn test_project_invalid_level() {
1732 let chunk = create_multi_level_chunk();
1733
1734 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1736 assert_eq!(projected.level_count(), 0);
1737 }
1738
1739 #[test]
1740 fn test_project_multi_level() {
1741 let chunk = create_multi_level_chunk();
1742
1743 let projected =
1745 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1746
1747 assert_eq!(projected.level_count(), 2);
1748 assert_eq!(projected.total_column_count(), 2);
1749 }
1750
1751 #[test]
1752 fn test_total_column_count() {
1753 let chunk = create_multi_level_chunk();
1754 assert_eq!(chunk.total_column_count(), 2); }
1756
1757 #[test]
1758 fn test_chunk_state_access() {
1759 let mut chunk = create_multi_level_chunk();
1760
1761 let state = chunk.chunk_state();
1762 assert!(state.is_factorized());
1763
1764 let state_mut = chunk.chunk_state_mut();
1765 state_mut.invalidate_cache();
1766 }
1767
1768 #[test]
1769 fn test_logical_row_iter_multi_level() {
1770 let chunk = create_multi_level_chunk();
1771
1772 let indices: Vec<_> = chunk.logical_row_iter().collect();
1773 assert_eq!(indices.len(), 4);
1774
1775 assert_eq!(indices[0], vec![0, 0]);
1777 assert_eq!(indices[1], vec![0, 1]);
1778 assert_eq!(indices[2], vec![1, 2]);
1779 assert_eq!(indices[3], vec![1, 3]);
1780 }
1781
1782 #[test]
1783 fn test_sum_deepest_with_float() {
1784 let mut sources = ValueVector::with_type(LogicalType::Int64);
1785 sources.push_int64(1);
1786
1787 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1788
1789 let mut floats = ValueVector::with_type(LogicalType::Float64);
1790 floats.push_float64(1.5);
1791 floats.push_float64(2.5);
1792 floats.push_float64(3.0);
1793
1794 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1795
1796 let sum = chunk.sum_deepest(0);
1797 assert_eq!(sum, Some(7.0)); }
1799
1800 #[test]
1801 fn test_min_max_with_strings() {
1802 let mut sources = ValueVector::with_type(LogicalType::Int64);
1803 sources.push_int64(1);
1804
1805 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1806
1807 let mut strings = ValueVector::with_type(LogicalType::String);
1808 strings.push_string("banana");
1809 strings.push_string("apple");
1810 strings.push_string("cherry");
1811
1812 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1813
1814 let min = chunk.min_deepest(0);
1815 assert_eq!(min, Some(Value::String("apple".into())));
1816
1817 let max = chunk.max_deepest(0);
1818 assert_eq!(max, Some(Value::String("cherry".into())));
1819 }
1820
1821 #[test]
1822 fn test_recompute_logical_row_count_empty() {
1823 let mut chunk = FactorizedChunk::empty();
1824 chunk.recompute_logical_row_count();
1825 assert_eq!(chunk.logical_row_count(), 0);
1826 }
1827
1828 #[test]
1829 fn test_factorization_level_group_count() {
1830 let chunk = create_multi_level_chunk();
1831
1832 let level0 = chunk.level(0).unwrap();
1833 assert_eq!(level0.group_count(), 2);
1834
1835 let level1 = chunk.level(1).unwrap();
1836 assert_eq!(level1.group_count(), 4);
1837 }
1838
1839 #[test]
1840 fn test_factorization_level_multiplicities() {
1841 let chunk = create_multi_level_chunk();
1842
1843 let level1 = chunk.level(1).unwrap();
1844 let mults = level1.multiplicities();
1845 assert_eq!(mults, &[2, 2]); }
1847
1848 #[test]
1849 fn test_factorization_level_column_names() {
1850 let chunk = create_multi_level_chunk();
1851
1852 let level0 = chunk.level(0).unwrap();
1853 assert_eq!(level0.column_names(), &["src"]);
1854
1855 let level1 = chunk.level(1).unwrap();
1856 assert_eq!(level1.column_names(), &["nbr"]);
1857 }
1858}