1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
240 if let Some(cached) = self.state.cached_multiplicities() {
242 return Arc::clone(cached);
243 }
244
245 let mults = self.compute_path_multiplicities();
247 let arc_mults: Arc<[usize]> = mults.into();
248 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
249 arc_mults
250 }
251
252 #[must_use]
254 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
255 self.levels.get(index)
256 }
257
258 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
260 self.levels.get_mut(index)
261 }
262
263 pub fn add_level(
274 &mut self,
275 columns: Vec<ValueVector>,
276 column_names: Vec<String>,
277 offsets: &[u32],
278 ) {
279 let parent_count = offsets.len().saturating_sub(1);
280
281 let multiplicities: Vec<usize> = (0..parent_count)
283 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
284 .collect();
285
286 let factorized_columns: Vec<FactorizedVector> = columns
288 .into_iter()
289 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
290 .collect();
291
292 let level =
293 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
294 self.levels.push(level);
295
296 if self.levels.len() == 1 {
300 self.logical_row_count = multiplicities.iter().sum();
302 } else {
303 self.recompute_logical_row_count();
305 }
306
307 self.update_state();
309 }
310
311 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
313 self.levels.push(level);
314 self.recompute_logical_row_count();
315 self.update_state();
316 }
317
318 fn update_state(&mut self) {
320 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
321 }
322
323 fn recompute_logical_row_count(&mut self) {
325 if self.levels.is_empty() {
326 self.logical_row_count = 0;
327 return;
328 }
329
330 let level0_count = self.levels[0].group_count;
332 if self.levels.len() == 1 {
333 self.logical_row_count = level0_count;
334 return;
335 }
336
337 let mut counts = vec![1usize; level0_count];
340
341 for level_idx in 1..self.levels.len() {
342 let level = &self.levels[level_idx];
343 let mut new_counts = Vec::new();
344
345 for (parent_idx, &parent_count) in counts.iter().enumerate() {
346 if parent_idx < level.multiplicities.len() {
348 let child_mult = level.multiplicities[parent_idx];
349 for _ in 0..child_mult {
350 new_counts.push(parent_count);
351 }
352 }
353 }
354
355 counts = new_counts;
356 }
357
358 self.logical_row_count = counts.len();
359 }
360
361 #[must_use]
365 pub fn flatten(&self) -> DataChunk {
366 if self.levels.is_empty() {
367 return DataChunk::empty();
368 }
369
370 let mut all_columns: Vec<ValueVector> = Vec::new();
372
373 if self.levels.len() == 1 {
375 let level = &self.levels[0];
376 for col in &level.columns {
377 all_columns.push(col.flatten(None));
378 }
379 return DataChunk::new(all_columns);
380 }
381
382 let row_iter = self.logical_row_iter();
385 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
386
387 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
389 for level in &self.levels {
390 for col in &level.columns {
391 output_columns.push(ValueVector::with_capacity(
392 col.data_type(),
393 self.logical_row_count,
394 ));
395 }
396 }
397
398 for indices in row_iter {
400 let mut col_offset = 0;
401 for (level_idx, level) in self.levels.iter().enumerate() {
402 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
403 for (col_idx, col) in level.columns.iter().enumerate() {
404 if let Some(value) = col.get_physical(level_idx_value) {
405 output_columns[col_offset + col_idx].push_value(value);
406 }
407 }
408 col_offset += level.column_count();
409 }
410 }
411
412 DataChunk::new(output_columns)
413 }
414
415 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
419 FactorizedRowIterator::new(self)
420 }
421
422 #[must_use]
424 pub fn total_column_count(&self) -> usize {
425 self.levels.iter().map(|l| l.column_count()).sum()
426 }
427
428 #[must_use]
430 pub fn all_column_names(&self) -> Vec<String> {
431 self.levels
432 .iter()
433 .flat_map(|l| l.column_names.iter().cloned())
434 .collect()
435 }
436
437 #[must_use]
451 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
452 where
453 F: Fn(&grafeo_common::types::Value) -> bool,
454 {
455 if self.levels.is_empty() {
456 return None;
457 }
458
459 let deepest_idx = self.levels.len() - 1;
460 let deepest = &self.levels[deepest_idx];
461
462 let filter_col = deepest.column(column_idx)?;
464
465 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
467 .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
468 .collect();
469
470 let parent_count = filter_col.parent_count();
472 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
473 let mut new_offsets: Vec<u32> = vec![0];
474
475 for parent_idx in 0..parent_count {
477 let (start, end) = filter_col.range_for_parent(parent_idx);
478
479 for phys_idx in start..end {
480 if let Some(value) = filter_col.get_physical(phys_idx) {
482 if predicate(&value) {
483 for col_idx in 0..deepest.column_count() {
485 if let Some(col) = deepest.column(col_idx) {
486 if let Some(v) = col.get_physical(phys_idx) {
487 new_columns[col_idx].push_value(v);
488 }
489 }
490 }
491 new_multiplicities[parent_idx] += 1;
492 }
493 }
494 }
495
496 new_offsets.push(new_columns[0].len() as u32);
497 }
498
499 let total_remaining: usize = new_multiplicities.iter().sum();
501 if total_remaining == 0 {
502 return Some(Self::empty());
503 }
504
505 let new_factorized_cols: Vec<FactorizedVector> = new_columns
507 .into_iter()
508 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
509 .collect();
510
511 let new_level = FactorizationLevel::unflat(
512 new_factorized_cols,
513 deepest.column_names().to_vec(),
514 new_multiplicities,
515 );
516
517 let mut result = Self {
519 levels: self.levels[..deepest_idx].to_vec(),
520 logical_row_count: 0,
521 state: ChunkState::flat(0),
522 };
523 result.levels.push(new_level);
524 result.recompute_logical_row_count();
525 result.update_state();
526
527 Some(result)
528 }
529
530 #[must_use]
538 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
539 where
540 F: Fn(&[grafeo_common::types::Value]) -> bool,
541 {
542 if self.levels.is_empty() {
543 return None;
544 }
545
546 let deepest_idx = self.levels.len() - 1;
547 let deepest = &self.levels[deepest_idx];
548 let col_count = deepest.column_count();
549
550 if col_count == 0 {
551 return None;
552 }
553
554 let first_col = deepest.column(0)?;
555 let parent_count = first_col.parent_count();
556
557 let mut new_columns: Vec<ValueVector> = (0..col_count)
559 .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
560 .collect();
561
562 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
563 let mut new_offsets: Vec<u32> = vec![0];
564 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
565
566 for parent_idx in 0..parent_count {
567 let (start, end) = first_col.range_for_parent(parent_idx);
568
569 for phys_idx in start..end {
570 row_values.clear();
572 for col_idx in 0..col_count {
573 if let Some(col) = deepest.column(col_idx) {
574 if let Some(v) = col.get_physical(phys_idx) {
575 row_values.push(v);
576 }
577 }
578 }
579
580 if predicate(&row_values) {
582 for (col_idx, v) in row_values.iter().enumerate() {
583 new_columns[col_idx].push_value(v.clone());
584 }
585 new_multiplicities[parent_idx] += 1;
586 }
587 }
588
589 new_offsets.push(new_columns[0].len() as u32);
590 }
591
592 let total: usize = new_multiplicities.iter().sum();
594 if total == 0 {
595 return Some(Self::empty());
596 }
597
598 let new_factorized_cols: Vec<FactorizedVector> = new_columns
600 .into_iter()
601 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
602 .collect();
603
604 let new_level = FactorizationLevel::unflat(
605 new_factorized_cols,
606 deepest.column_names().to_vec(),
607 new_multiplicities,
608 );
609
610 let mut result = Self {
611 levels: self.levels[..deepest_idx].to_vec(),
612 logical_row_count: 0,
613 state: ChunkState::flat(0),
614 };
615 result.levels.push(new_level);
616 result.recompute_logical_row_count();
617 result.update_state();
618
619 Some(result)
620 }
621
622 #[must_use]
642 pub fn count_rows(&self) -> usize {
643 self.logical_row_count()
644 }
645
646 #[must_use]
663 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
664 if self.levels.is_empty() {
665 return Vec::new();
666 }
667
668 if self.levels.len() == 1 {
670 return vec![1; self.levels[0].group_count];
671 }
672
673 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
675
676 for level_idx in 1..self.levels.len() {
678 let level = &self.levels[level_idx];
679 let mut child_multiplicities = Vec::with_capacity(level.group_count);
680
681 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
683 let child_count = if parent_idx < level.multiplicities.len() {
684 level.multiplicities[parent_idx]
685 } else {
686 0
687 };
688
689 for _ in 0..child_count {
691 child_multiplicities.push(parent_mult);
692 }
693 }
694
695 parent_multiplicities = child_multiplicities;
696 }
697
698 parent_multiplicities
699 }
700
701 #[must_use]
714 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
715 if self.levels.is_empty() {
716 return None;
717 }
718
719 let deepest_idx = self.levels.len() - 1;
720 let deepest = &self.levels[deepest_idx];
721 let col = deepest.column(column_idx)?;
722
723 let multiplicities = self.compute_path_multiplicities();
725
726 let mut sum = 0.0;
727 for (phys_idx, mult) in multiplicities.iter().enumerate() {
728 if let Some(value) = col.get_physical(phys_idx) {
729 let num_value = match &value {
731 grafeo_common::types::Value::Int64(v) => *v as f64,
732 grafeo_common::types::Value::Float64(v) => *v,
733 _ => continue, };
735 sum += num_value * (*mult as f64);
736 }
737 }
738 Some(sum)
739 }
740
741 #[must_use]
753 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
754 let count = self.logical_row_count();
755 if count == 0 {
756 return None;
757 }
758
759 let sum = self.sum_deepest(column_idx)?;
760 Some(sum / count as f64)
761 }
762
763 #[must_use]
776 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
777 if self.levels.is_empty() {
778 return None;
779 }
780
781 let deepest_idx = self.levels.len() - 1;
782 let deepest = &self.levels[deepest_idx];
783 let col = deepest.column(column_idx)?;
784
785 let mut min_value: Option<grafeo_common::types::Value> = None;
786
787 for phys_idx in 0..col.physical_len() {
788 if let Some(value) = col.get_physical(phys_idx) {
789 min_value = Some(match min_value {
790 None => value,
791 Some(current) => {
792 if Self::value_less_than(&value, ¤t) {
793 value
794 } else {
795 current
796 }
797 }
798 });
799 }
800 }
801
802 min_value
803 }
804
805 #[must_use]
818 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
819 if self.levels.is_empty() {
820 return None;
821 }
822
823 let deepest_idx = self.levels.len() - 1;
824 let deepest = &self.levels[deepest_idx];
825 let col = deepest.column(column_idx)?;
826
827 let mut max_value: Option<grafeo_common::types::Value> = None;
828
829 for phys_idx in 0..col.physical_len() {
830 if let Some(value) = col.get_physical(phys_idx) {
831 max_value = Some(match max_value {
832 None => value,
833 Some(current) => {
834 if Self::value_less_than(¤t, &value) {
835 value
836 } else {
837 current
838 }
839 }
840 });
841 }
842 }
843
844 max_value
845 }
846
847 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
855 use grafeo_common::types::Value;
856
857 match (a, b) {
858 (Value::Null, Value::Null) => false,
860 (Value::Null, _) => true,
861 (_, Value::Null) => false,
862
863 (Value::Int64(x), Value::Int64(y)) => x < y,
865 (Value::Float64(x), Value::Float64(y)) => x < y,
866 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
867 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
868
869 (Value::String(x), Value::String(y)) => x.as_ref() < y.as_ref(),
871
872 (Value::Bool(x), Value::Bool(y)) => !x && *y,
874
875 _ => false,
878 }
879 }
880
881 #[must_use]
895 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
896 if self.levels.is_empty() || column_specs.is_empty() {
897 return Self::empty();
898 }
899
900 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
902 for (level_idx, col_idx, name) in column_specs {
903 if *level_idx < self.levels.len() {
904 level_specs[*level_idx].push((*col_idx, name.clone()));
905 }
906 }
907
908 let mut new_levels = Vec::new();
910
911 for (level_idx, specs) in level_specs.iter().enumerate() {
912 if specs.is_empty() {
913 continue;
914 }
915
916 let src_level = &self.levels[level_idx];
917
918 let columns: Vec<FactorizedVector> = specs
919 .iter()
920 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
921 .collect();
922
923 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
924
925 if level_idx == 0 {
926 new_levels.push(FactorizationLevel::flat(columns, names));
927 } else {
928 let mults = src_level.multiplicities().to_vec();
929 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
930 }
931 }
932
933 if new_levels.is_empty() {
934 return Self::empty();
935 }
936
937 let mut result = Self {
938 levels: new_levels,
939 logical_row_count: 0,
940 state: ChunkState::flat(0),
941 };
942 result.recompute_logical_row_count();
943 result.update_state();
944 result
945 }
946}
947
948pub struct FactorizedRowIterator<'a> {
963 chunk: &'a FactorizedChunk,
964 indices: Vec<usize>,
966 exhausted: bool,
969}
970
971impl<'a> FactorizedRowIterator<'a> {
972 fn new(chunk: &'a FactorizedChunk) -> Self {
973 let indices = vec![0; chunk.level_count()];
974 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
975
976 let mut iter = Self {
977 chunk,
978 indices,
979 exhausted,
980 };
981
982 if !exhausted && !iter.has_valid_deepest_range() {
984 if !iter.advance() {
985 exhausted = true;
986 }
987 iter.exhausted = exhausted;
988 }
989
990 iter
991 }
992
993 fn advance(&mut self) -> bool {
995 if self.exhausted || self.chunk.levels.is_empty() {
996 return false;
997 }
998
999 for level_idx in (0..self.chunk.levels.len()).rev() {
1001 let level = &self.chunk.levels[level_idx];
1002
1003 let parent_idx = if level_idx == 0 {
1005 self.indices[0] + 1
1007 } else {
1008 self.indices[level_idx - 1]
1010 };
1011
1012 let (_start, end) = if level_idx == 0 {
1014 (0, level.group_count)
1015 } else {
1016 if let Some(col) = level.columns.first() {
1018 col.range_for_parent(parent_idx)
1019 } else {
1020 (0, 0)
1021 }
1022 };
1023
1024 let current = self.indices[level_idx];
1025 if current + 1 < end {
1026 self.indices[level_idx] = current + 1;
1028 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1030 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1031 let (deeper_start, _) =
1032 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1033 self.indices[deeper_idx] = deeper_start;
1034 }
1035 }
1036
1037 if self.has_valid_deepest_range() {
1040 return true;
1041 }
1042 return self.advance();
1045 }
1046 }
1048
1049 self.exhausted = true;
1051 false
1052 }
1053
1054 fn has_valid_deepest_range(&self) -> bool {
1056 if self.chunk.levels.len() <= 1 {
1057 return true; }
1059
1060 let deepest_idx = self.chunk.levels.len() - 1;
1061 let parent_idx = self.indices[deepest_idx - 1];
1062
1063 if let Some(col) = self.chunk.levels[deepest_idx].columns.first() {
1064 let (start, end) = col.range_for_parent(parent_idx);
1065 start < end } else {
1067 false
1068 }
1069 }
1070}
1071
1072impl Iterator for FactorizedRowIterator<'_> {
1073 type Item = Vec<usize>;
1074
1075 fn next(&mut self) -> Option<Self::Item> {
1076 if self.exhausted {
1077 return None;
1078 }
1079
1080 let result = self.indices.clone();
1082 self.advance();
1083 Some(result)
1084 }
1085}
1086
1087#[derive(Debug, Clone)]
1089pub enum ChunkVariant {
1090 Flat(DataChunk),
1092 Factorized(FactorizedChunk),
1094}
1095
1096impl ChunkVariant {
1097 #[must_use]
1099 pub fn flat(chunk: DataChunk) -> Self {
1100 Self::Flat(chunk)
1101 }
1102
1103 #[must_use]
1105 pub fn factorized(chunk: FactorizedChunk) -> Self {
1106 Self::Factorized(chunk)
1107 }
1108
1109 #[must_use]
1111 pub fn ensure_flat(self) -> DataChunk {
1112 match self {
1113 Self::Flat(chunk) => chunk,
1114 Self::Factorized(chunk) => chunk.flatten(),
1115 }
1116 }
1117
1118 #[must_use]
1120 pub fn logical_row_count(&self) -> usize {
1121 match self {
1122 Self::Flat(chunk) => chunk.row_count(),
1123 Self::Factorized(chunk) => chunk.logical_row_count(),
1124 }
1125 }
1126
1127 #[must_use]
1129 pub fn is_factorized(&self) -> bool {
1130 matches!(self, Self::Factorized(_))
1131 }
1132
1133 #[must_use]
1135 pub fn is_flat(&self) -> bool {
1136 matches!(self, Self::Flat(_))
1137 }
1138
1139 #[must_use]
1141 pub fn is_empty(&self) -> bool {
1142 self.logical_row_count() == 0
1143 }
1144}
1145
1146impl From<DataChunk> for ChunkVariant {
1147 fn from(chunk: DataChunk) -> Self {
1148 Self::Flat(chunk)
1149 }
1150}
1151
1152impl From<FactorizedChunk> for ChunkVariant {
1153 fn from(chunk: FactorizedChunk) -> Self {
1154 Self::Factorized(chunk)
1155 }
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use grafeo_common::types::{LogicalType, NodeId, Value};
1161
1162 use super::*;
1163
1164 fn make_flat_chunk() -> DataChunk {
1165 let mut col = ValueVector::with_type(LogicalType::Int64);
1166 col.push_int64(1);
1167 col.push_int64(2);
1168 DataChunk::new(vec![col])
1169 }
1170
1171 fn create_multi_level_chunk() -> FactorizedChunk {
1172 let mut sources = ValueVector::with_type(LogicalType::Int64);
1174 sources.push_int64(10);
1175 sources.push_int64(20);
1176
1177 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1178
1179 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1180 neighbors.push_int64(1);
1181 neighbors.push_int64(2);
1182 neighbors.push_int64(3);
1183 neighbors.push_int64(4);
1184
1185 let offsets = vec![0, 2, 4];
1186 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1187 chunk
1188 }
1189
1190 #[test]
1191 fn test_from_flat() {
1192 let flat = make_flat_chunk();
1193 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1194
1195 assert_eq!(factorized.level_count(), 1);
1196 assert_eq!(factorized.logical_row_count(), 2);
1197 assert_eq!(factorized.physical_size(), 2);
1198 }
1199
1200 #[test]
1201 fn test_add_level() {
1202 let mut col0 = ValueVector::with_type(LogicalType::Node);
1204 col0.push_node_id(NodeId::new(100));
1205 col0.push_node_id(NodeId::new(200));
1206
1207 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1208
1209 assert_eq!(chunk.level_count(), 1);
1210 assert_eq!(chunk.logical_row_count(), 2);
1211
1212 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1214 neighbors.push_node_id(NodeId::new(10));
1215 neighbors.push_node_id(NodeId::new(11));
1216 neighbors.push_node_id(NodeId::new(12));
1217 neighbors.push_node_id(NodeId::new(20));
1218 neighbors.push_node_id(NodeId::new(21));
1219
1220 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1222
1223 assert_eq!(chunk.level_count(), 2);
1224 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1227
1228 #[test]
1229 fn test_flatten_single_level() {
1230 let flat = make_flat_chunk();
1231 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1232
1233 let flattened = factorized.flatten();
1234 assert_eq!(flattened.row_count(), 2);
1235 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1236 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1237 }
1238
1239 #[test]
1240 fn test_flatten_multi_level() {
1241 let mut sources = ValueVector::with_type(LogicalType::Int64);
1243 sources.push_int64(1);
1244 sources.push_int64(2);
1245
1246 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1247
1248 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1249 neighbors.push_int64(10);
1250 neighbors.push_int64(11);
1251 neighbors.push_int64(20);
1252 neighbors.push_int64(21);
1253
1254 let offsets = vec![0, 2, 4];
1255 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1256
1257 let flat = chunk.flatten();
1258 assert_eq!(flat.row_count(), 4);
1259 assert_eq!(flat.column_count(), 2);
1260
1261 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1264 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1265 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1266 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1267 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1268 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1269 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1270 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1271 }
1272
1273 #[test]
1274 fn test_logical_row_iter_single_level() {
1275 let flat = make_flat_chunk();
1276 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1277
1278 let indices: Vec<_> = factorized.logical_row_iter().collect();
1279 assert_eq!(indices.len(), 2);
1280 assert_eq!(indices[0], vec![0]);
1281 assert_eq!(indices[1], vec![1]);
1282 }
1283
1284 #[test]
1285 fn test_chunk_variant() {
1286 let flat = make_flat_chunk();
1287 let variant = ChunkVariant::flat(flat.clone());
1288
1289 assert!(variant.is_flat());
1290 assert!(!variant.is_factorized());
1291 assert_eq!(variant.logical_row_count(), 2);
1292
1293 let ensured = variant.ensure_flat();
1294 assert_eq!(ensured.row_count(), 2);
1295 }
1296
1297 #[test]
1298 fn test_chunk_variant_factorized() {
1299 let chunk = create_multi_level_chunk();
1300 let variant = ChunkVariant::factorized(chunk);
1301
1302 assert!(variant.is_factorized());
1303 assert!(!variant.is_flat());
1304 assert_eq!(variant.logical_row_count(), 4);
1305
1306 let flat = variant.ensure_flat();
1307 assert_eq!(flat.row_count(), 4);
1308 }
1309
1310 #[test]
1311 fn test_chunk_variant_from() {
1312 let flat = make_flat_chunk();
1313 let variant: ChunkVariant = flat.into();
1314 assert!(variant.is_flat());
1315
1316 let factorized = create_multi_level_chunk();
1317 let variant2: ChunkVariant = factorized.into();
1318 assert!(variant2.is_factorized());
1319 }
1320
1321 #[test]
1322 fn test_chunk_variant_is_empty() {
1323 let empty_flat = DataChunk::empty();
1324 let variant = ChunkVariant::flat(empty_flat);
1325 assert!(variant.is_empty());
1326
1327 let non_empty = make_flat_chunk();
1328 let variant2 = ChunkVariant::flat(non_empty);
1329 assert!(!variant2.is_empty());
1330 }
1331
1332 #[test]
1333 fn test_empty_chunk() {
1334 let chunk = FactorizedChunk::empty();
1335 assert_eq!(chunk.level_count(), 0);
1336 assert_eq!(chunk.logical_row_count(), 0);
1337 assert_eq!(chunk.physical_size(), 0);
1338
1339 let flat = chunk.flatten();
1340 assert!(flat.is_empty());
1341 }
1342
1343 #[test]
1344 fn test_all_column_names() {
1345 let mut sources = ValueVector::with_type(LogicalType::Int64);
1346 sources.push_int64(1);
1347
1348 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1349
1350 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1351 neighbors.push_int64(10);
1352
1353 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1354
1355 let names = chunk.all_column_names();
1356 assert_eq!(names, vec!["source", "neighbor"]);
1357 }
1358
1359 #[test]
1360 fn test_level_mut() {
1361 let mut chunk = create_multi_level_chunk();
1362
1363 let level = chunk.level_mut(0).unwrap();
1365 assert_eq!(level.column_count(), 1);
1366
1367 assert!(chunk.level_mut(10).is_none());
1369 }
1370
1371 #[test]
1372 fn test_factorization_level_column_mut() {
1373 let mut chunk = create_multi_level_chunk();
1374
1375 let level = chunk.level_mut(0).unwrap();
1376 let col = level.column_mut(0);
1377 assert!(col.is_some());
1378
1379 assert!(level.column_mut(10).is_none());
1381 }
1382
1383 #[test]
1384 fn test_factorization_level_physical_value_count() {
1385 let chunk = create_multi_level_chunk();
1386
1387 let level0 = chunk.level(0).unwrap();
1388 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1391 assert_eq!(level1.physical_value_count(), 4); }
1393
1394 #[test]
1395 fn test_count_rows() {
1396 let chunk = create_multi_level_chunk();
1397 assert_eq!(chunk.count_rows(), 4);
1398
1399 let empty = FactorizedChunk::empty();
1400 assert_eq!(empty.count_rows(), 0);
1401 }
1402
1403 #[test]
1404 fn test_compute_path_multiplicities() {
1405 let chunk = create_multi_level_chunk();
1406
1407 let mults = chunk.compute_path_multiplicities();
1408 assert_eq!(mults.len(), 4);
1410 assert!(mults.iter().all(|&m| m == 1));
1411 }
1412
1413 #[test]
1414 fn test_compute_path_multiplicities_single_level() {
1415 let mut col = ValueVector::with_type(LogicalType::Int64);
1416 col.push_int64(1);
1417 col.push_int64(2);
1418 col.push_int64(3);
1419
1420 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1421 let mults = chunk.compute_path_multiplicities();
1422
1423 assert_eq!(mults.len(), 3);
1425 assert!(mults.iter().all(|&m| m == 1));
1426 }
1427
1428 #[test]
1429 fn test_compute_path_multiplicities_empty() {
1430 let chunk = FactorizedChunk::empty();
1431 let mults = chunk.compute_path_multiplicities();
1432 assert!(mults.is_empty());
1433 }
1434
1435 #[test]
1436 fn test_path_multiplicities_cached() {
1437 let mut chunk = create_multi_level_chunk();
1438
1439 let mults1 = chunk.path_multiplicities_cached();
1441 assert_eq!(mults1.len(), 4);
1442
1443 let mults2 = chunk.path_multiplicities_cached();
1445 assert_eq!(mults1.len(), mults2.len());
1446 }
1447
1448 #[test]
1449 fn test_sum_deepest() {
1450 let chunk = create_multi_level_chunk();
1451
1452 let sum = chunk.sum_deepest(0);
1454 assert_eq!(sum, Some(10.0)); }
1456
1457 #[test]
1458 fn test_sum_deepest_empty() {
1459 let chunk = FactorizedChunk::empty();
1460 assert!(chunk.sum_deepest(0).is_none());
1461 }
1462
1463 #[test]
1464 fn test_sum_deepest_invalid_column() {
1465 let chunk = create_multi_level_chunk();
1466 assert!(chunk.sum_deepest(10).is_none());
1467 }
1468
1469 #[test]
1470 fn test_avg_deepest() {
1471 let chunk = create_multi_level_chunk();
1472
1473 let avg = chunk.avg_deepest(0);
1475 assert_eq!(avg, Some(2.5));
1476 }
1477
1478 #[test]
1479 fn test_avg_deepest_empty() {
1480 let chunk = FactorizedChunk::empty();
1481 assert!(chunk.avg_deepest(0).is_none());
1482 }
1483
1484 #[test]
1485 fn test_min_deepest() {
1486 let chunk = create_multi_level_chunk();
1487
1488 let min = chunk.min_deepest(0);
1489 assert_eq!(min, Some(Value::Int64(1)));
1490 }
1491
1492 #[test]
1493 fn test_min_deepest_empty() {
1494 let chunk = FactorizedChunk::empty();
1495 assert!(chunk.min_deepest(0).is_none());
1496 }
1497
1498 #[test]
1499 fn test_min_deepest_invalid_column() {
1500 let chunk = create_multi_level_chunk();
1501 assert!(chunk.min_deepest(10).is_none());
1502 }
1503
1504 #[test]
1505 fn test_max_deepest() {
1506 let chunk = create_multi_level_chunk();
1507
1508 let max = chunk.max_deepest(0);
1509 assert_eq!(max, Some(Value::Int64(4)));
1510 }
1511
1512 #[test]
1513 fn test_max_deepest_empty() {
1514 let chunk = FactorizedChunk::empty();
1515 assert!(chunk.max_deepest(0).is_none());
1516 }
1517
1518 #[test]
1519 fn test_value_less_than() {
1520 assert!(FactorizedChunk::value_less_than(
1522 &Value::Null,
1523 &Value::Int64(1)
1524 ));
1525 assert!(!FactorizedChunk::value_less_than(
1526 &Value::Int64(1),
1527 &Value::Null
1528 ));
1529 assert!(!FactorizedChunk::value_less_than(
1530 &Value::Null,
1531 &Value::Null
1532 ));
1533
1534 assert!(FactorizedChunk::value_less_than(
1536 &Value::Int64(1),
1537 &Value::Int64(2)
1538 ));
1539 assert!(!FactorizedChunk::value_less_than(
1540 &Value::Int64(2),
1541 &Value::Int64(1)
1542 ));
1543
1544 assert!(FactorizedChunk::value_less_than(
1546 &Value::Float64(1.5),
1547 &Value::Float64(2.5)
1548 ));
1549
1550 assert!(FactorizedChunk::value_less_than(
1552 &Value::Int64(1),
1553 &Value::Float64(1.5)
1554 ));
1555 assert!(FactorizedChunk::value_less_than(
1556 &Value::Float64(0.5),
1557 &Value::Int64(1)
1558 ));
1559
1560 assert!(FactorizedChunk::value_less_than(
1562 &Value::String("apple".into()),
1563 &Value::String("banana".into())
1564 ));
1565
1566 assert!(FactorizedChunk::value_less_than(
1568 &Value::Bool(false),
1569 &Value::Bool(true)
1570 ));
1571 assert!(!FactorizedChunk::value_less_than(
1572 &Value::Bool(true),
1573 &Value::Bool(false)
1574 ));
1575
1576 assert!(!FactorizedChunk::value_less_than(
1578 &Value::Int64(1),
1579 &Value::String("hello".into())
1580 ));
1581 }
1582
1583 #[test]
1584 fn test_filter_deepest() {
1585 let chunk = create_multi_level_chunk();
1586
1587 let filtered = chunk.filter_deepest(0, |v| {
1589 if let Value::Int64(n) = v {
1590 *n > 2
1591 } else {
1592 false
1593 }
1594 });
1595
1596 let filtered = filtered.unwrap();
1597 assert_eq!(filtered.logical_row_count(), 2); }
1599
1600 #[test]
1601 fn test_filter_deepest_empty() {
1602 let chunk = FactorizedChunk::empty();
1603 assert!(chunk.filter_deepest(0, |_| true).is_none());
1604 }
1605
1606 #[test]
1607 fn test_filter_deepest_all_filtered() {
1608 let chunk = create_multi_level_chunk();
1609
1610 let filtered = chunk.filter_deepest(0, |_| false);
1612
1613 let filtered = filtered.unwrap();
1614 assert_eq!(filtered.logical_row_count(), 0);
1615 }
1616
1617 #[test]
1618 fn test_filter_deepest_invalid_column() {
1619 let chunk = create_multi_level_chunk();
1620 assert!(chunk.filter_deepest(10, |_| true).is_none());
1621 }
1622
1623 #[test]
1624 fn test_filter_deepest_multi() {
1625 let mut sources = ValueVector::with_type(LogicalType::Int64);
1627 sources.push_int64(1);
1628
1629 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1630
1631 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1632 col1.push_int64(10);
1633 col1.push_int64(20);
1634 col1.push_int64(30);
1635
1636 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1637 col2.push_int64(1);
1638 col2.push_int64(2);
1639 col2.push_int64(3);
1640
1641 let offsets = vec![0, 3];
1642 chunk.add_level(
1643 vec![col1, col2],
1644 vec!["a".to_string(), "b".to_string()],
1645 &offsets,
1646 );
1647
1648 let filtered = chunk.filter_deepest_multi(|values| {
1650 if values.len() == 2 {
1651 if let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1]) {
1652 return *a + *b > 15;
1653 }
1654 }
1655 false
1656 });
1657
1658 assert!(filtered.is_some());
1659 let filtered = filtered.unwrap();
1660 assert_eq!(filtered.logical_row_count(), 2); }
1662
1663 #[test]
1664 fn test_filter_deepest_multi_empty() {
1665 let chunk = FactorizedChunk::empty();
1666 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1667 }
1668
1669 #[test]
1670 fn test_filter_deepest_multi_no_columns() {
1671 let mut sources = ValueVector::with_type(LogicalType::Int64);
1673 sources.push_int64(1);
1674
1675 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1676
1677 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1679 chunk.add_factorized_level(empty_level);
1680
1681 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1682 }
1683
1684 #[test]
1685 fn test_project() {
1686 let mut sources = ValueVector::with_type(LogicalType::Int64);
1687 sources.push_int64(1);
1688 sources.push_int64(2);
1689
1690 let mut col2 = ValueVector::with_type(LogicalType::String);
1691 col2.push_string("a");
1692 col2.push_string("b");
1693
1694 let chunk = FactorizedChunk::with_flat_level(
1695 vec![sources, col2],
1696 vec!["num".to_string(), "str".to_string()],
1697 );
1698
1699 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1701
1702 assert_eq!(projected.total_column_count(), 1);
1703 let names = projected.all_column_names();
1704 assert_eq!(names, vec!["projected_num"]);
1705 }
1706
1707 #[test]
1708 fn test_project_empty() {
1709 let chunk = FactorizedChunk::empty();
1710 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1711 assert_eq!(projected.level_count(), 0);
1712 }
1713
1714 #[test]
1715 fn test_project_empty_specs() {
1716 let chunk = create_multi_level_chunk();
1717 let projected = chunk.project(&[]);
1718 assert_eq!(projected.level_count(), 0);
1719 }
1720
1721 #[test]
1722 fn test_project_invalid_level() {
1723 let chunk = create_multi_level_chunk();
1724
1725 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1727 assert_eq!(projected.level_count(), 0);
1728 }
1729
1730 #[test]
1731 fn test_project_multi_level() {
1732 let chunk = create_multi_level_chunk();
1733
1734 let projected =
1736 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1737
1738 assert_eq!(projected.level_count(), 2);
1739 assert_eq!(projected.total_column_count(), 2);
1740 }
1741
1742 #[test]
1743 fn test_total_column_count() {
1744 let chunk = create_multi_level_chunk();
1745 assert_eq!(chunk.total_column_count(), 2); }
1747
1748 #[test]
1749 fn test_chunk_state_access() {
1750 let mut chunk = create_multi_level_chunk();
1751
1752 let state = chunk.chunk_state();
1753 assert!(state.is_factorized());
1754
1755 let state_mut = chunk.chunk_state_mut();
1756 state_mut.invalidate_cache();
1757 }
1758
1759 #[test]
1760 fn test_logical_row_iter_multi_level() {
1761 let chunk = create_multi_level_chunk();
1762
1763 let indices: Vec<_> = chunk.logical_row_iter().collect();
1764 assert_eq!(indices.len(), 4);
1765
1766 assert_eq!(indices[0], vec![0, 0]);
1768 assert_eq!(indices[1], vec![0, 1]);
1769 assert_eq!(indices[2], vec![1, 2]);
1770 assert_eq!(indices[3], vec![1, 3]);
1771 }
1772
1773 #[test]
1774 fn test_sum_deepest_with_float() {
1775 let mut sources = ValueVector::with_type(LogicalType::Int64);
1776 sources.push_int64(1);
1777
1778 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1779
1780 let mut floats = ValueVector::with_type(LogicalType::Float64);
1781 floats.push_float64(1.5);
1782 floats.push_float64(2.5);
1783 floats.push_float64(3.0);
1784
1785 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1786
1787 let sum = chunk.sum_deepest(0);
1788 assert_eq!(sum, Some(7.0)); }
1790
1791 #[test]
1792 fn test_min_max_with_strings() {
1793 let mut sources = ValueVector::with_type(LogicalType::Int64);
1794 sources.push_int64(1);
1795
1796 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1797
1798 let mut strings = ValueVector::with_type(LogicalType::String);
1799 strings.push_string("banana");
1800 strings.push_string("apple");
1801 strings.push_string("cherry");
1802
1803 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1804
1805 let min = chunk.min_deepest(0);
1806 assert_eq!(min, Some(Value::String("apple".into())));
1807
1808 let max = chunk.max_deepest(0);
1809 assert_eq!(max, Some(Value::String("cherry".into())));
1810 }
1811
1812 #[test]
1813 fn test_recompute_logical_row_count_empty() {
1814 let mut chunk = FactorizedChunk::empty();
1815 chunk.recompute_logical_row_count();
1816 assert_eq!(chunk.logical_row_count(), 0);
1817 }
1818
1819 #[test]
1820 fn test_factorization_level_group_count() {
1821 let chunk = create_multi_level_chunk();
1822
1823 let level0 = chunk.level(0).unwrap();
1824 assert_eq!(level0.group_count(), 2);
1825
1826 let level1 = chunk.level(1).unwrap();
1827 assert_eq!(level1.group_count(), 4);
1828 }
1829
1830 #[test]
1831 fn test_factorization_level_multiplicities() {
1832 let chunk = create_multi_level_chunk();
1833
1834 let level1 = chunk.level(1).unwrap();
1835 let mults = level1.multiplicities();
1836 assert_eq!(mults, &[2, 2]); }
1838
1839 #[test]
1840 fn test_factorization_level_column_names() {
1841 let chunk = create_multi_level_chunk();
1842
1843 let level0 = chunk.level(0).unwrap();
1844 assert_eq!(level0.column_names(), &["src"]);
1845
1846 let level1 = chunk.level(1).unwrap();
1847 assert_eq!(level1.column_names(), &["nbr"]);
1848 }
1849}