1use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45 levels: Vec<FactorizationLevel>,
47 logical_row_count: usize,
49 state: ChunkState,
51}
52
53#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56 columns: Vec<FactorizedVector>,
58 column_names: Vec<String>,
60 group_count: usize,
62 multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69 #[must_use]
71 pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72 let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73 let multiplicities = vec![1; group_count];
74 Self {
75 columns,
76 column_names,
77 group_count,
78 multiplicities,
79 }
80 }
81
82 #[must_use]
87 pub fn unflat(
88 columns: Vec<FactorizedVector>,
89 column_names: Vec<String>,
90 multiplicities: Vec<usize>,
91 ) -> Self {
92 let group_count = multiplicities.iter().sum();
94 Self {
95 columns,
96 column_names,
97 group_count,
98 multiplicities,
99 }
100 }
101
102 #[must_use]
104 pub fn column_count(&self) -> usize {
105 self.columns.len()
106 }
107
108 #[must_use]
110 pub fn group_count(&self) -> usize {
111 self.group_count
112 }
113
114 #[must_use]
116 pub fn physical_value_count(&self) -> usize {
117 self.columns
118 .iter()
119 .map(FactorizedVector::physical_len)
120 .sum()
121 }
122
123 #[must_use]
125 pub fn multiplicities(&self) -> &[usize] {
126 &self.multiplicities
127 }
128
129 #[must_use]
131 pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132 self.columns.get(index)
133 }
134
135 pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137 self.columns.get_mut(index)
138 }
139
140 #[must_use]
142 pub fn column_names(&self) -> &[String] {
143 &self.column_names
144 }
145}
146
147impl FactorizedChunk {
148 #[must_use]
150 pub fn empty() -> Self {
151 Self {
152 levels: Vec::new(),
153 logical_row_count: 0,
154 state: ChunkState::flat(0),
155 }
156 }
157
158 #[must_use]
162 pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163 let columns: Vec<FactorizedVector> = chunk
164 .columns()
165 .iter()
166 .map(|c| FactorizedVector::flat(c.clone()))
167 .collect();
168
169 let row_count = chunk.row_count();
170 let level = FactorizationLevel::flat(columns, column_names);
171
172 Self {
173 levels: vec![level],
174 logical_row_count: row_count,
175 state: ChunkState::unflat(1, row_count),
176 }
177 }
178
179 #[must_use]
181 pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182 let row_count = columns.first().map_or(0, ValueVector::len);
183 let factorized_columns: Vec<FactorizedVector> =
184 columns.into_iter().map(FactorizedVector::flat).collect();
185
186 let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188 Self {
189 levels: vec![level],
190 logical_row_count: row_count,
191 state: ChunkState::unflat(1, row_count),
192 }
193 }
194
195 #[must_use]
197 pub fn level_count(&self) -> usize {
198 self.levels.len()
199 }
200
201 #[must_use]
203 pub fn logical_row_count(&self) -> usize {
204 self.logical_row_count
205 }
206
207 #[must_use]
209 pub fn physical_size(&self) -> usize {
210 self.levels
211 .iter()
212 .map(FactorizationLevel::physical_value_count)
213 .sum()
214 }
215
216 #[must_use]
218 pub fn chunk_state(&self) -> &ChunkState {
219 &self.state
220 }
221
222 pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224 &mut self.state
225 }
226
227 pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
242 if let Some(cached) = self.state.cached_multiplicities() {
244 return Arc::clone(cached);
245 }
246
247 let mults = self.compute_path_multiplicities();
249 let arc_mults: Arc<[usize]> = mults.into();
250 self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
251 arc_mults
252 }
253
254 #[must_use]
256 pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
257 self.levels.get(index)
258 }
259
260 pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
262 self.levels.get_mut(index)
263 }
264
265 pub fn add_level(
276 &mut self,
277 columns: Vec<ValueVector>,
278 column_names: Vec<String>,
279 offsets: &[u32],
280 ) {
281 let parent_count = offsets.len().saturating_sub(1);
282
283 let multiplicities: Vec<usize> = (0..parent_count)
285 .map(|i| (offsets[i + 1] - offsets[i]) as usize)
286 .collect();
287
288 let factorized_columns: Vec<FactorizedVector> = columns
290 .into_iter()
291 .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
292 .collect();
293
294 let level =
295 FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
296 self.levels.push(level);
297
298 if self.levels.len() == 1 {
302 self.logical_row_count = multiplicities.iter().sum();
304 } else {
305 self.recompute_logical_row_count();
307 }
308
309 self.update_state();
311 }
312
313 pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
315 self.levels.push(level);
316 self.recompute_logical_row_count();
317 self.update_state();
318 }
319
320 fn update_state(&mut self) {
322 self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
323 }
324
325 fn recompute_logical_row_count(&mut self) {
327 if self.levels.is_empty() {
328 self.logical_row_count = 0;
329 return;
330 }
331
332 let level0_count = self.levels[0].group_count;
334 if self.levels.len() == 1 {
335 self.logical_row_count = level0_count;
336 return;
337 }
338
339 let mut counts = vec![1usize; level0_count];
342
343 for level_idx in 1..self.levels.len() {
344 let level = &self.levels[level_idx];
345 let mut new_counts = Vec::with_capacity(counts.len() * 2); for (parent_idx, &parent_count) in counts.iter().enumerate() {
348 if parent_idx < level.multiplicities.len() {
350 let child_mult = level.multiplicities[parent_idx];
351 for _ in 0..child_mult {
352 new_counts.push(parent_count);
353 }
354 }
355 }
356
357 counts = new_counts;
358 }
359
360 self.logical_row_count = counts.len();
361 }
362
363 #[must_use]
367 pub fn flatten(&self) -> DataChunk {
368 if self.levels.is_empty() {
369 return DataChunk::empty();
370 }
371
372 let mut all_columns: Vec<ValueVector> = Vec::new();
374
375 if self.levels.len() == 1 {
377 let level = &self.levels[0];
378 for col in &level.columns {
379 all_columns.push(col.flatten(None));
380 }
381 return DataChunk::new(all_columns);
382 }
383
384 let row_iter = self.logical_row_iter();
387 let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
388
389 let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
391 for level in &self.levels {
392 for col in &level.columns {
393 output_columns.push(ValueVector::with_capacity(
394 col.data_type(),
395 self.logical_row_count,
396 ));
397 }
398 }
399
400 for indices in row_iter {
402 let mut col_offset = 0;
403 for (level_idx, level) in self.levels.iter().enumerate() {
404 let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
405 for (col_idx, col) in level.columns.iter().enumerate() {
406 if let Some(value) = col.get_physical(level_idx_value) {
407 output_columns[col_offset + col_idx].push_value(value);
408 }
409 }
410 col_offset += level.column_count();
411 }
412 }
413
414 DataChunk::new(output_columns)
415 }
416
417 pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
421 FactorizedRowIterator::new(self)
422 }
423
424 #[must_use]
426 pub fn total_column_count(&self) -> usize {
427 self.levels.iter().map(|l| l.column_count()).sum()
428 }
429
430 #[must_use]
432 pub fn all_column_names(&self) -> Vec<String> {
433 self.levels
434 .iter()
435 .flat_map(|l| l.column_names.iter().cloned())
436 .collect()
437 }
438
439 #[must_use]
453 pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
454 where
455 F: Fn(&grafeo_common::types::Value) -> bool,
456 {
457 if self.levels.is_empty() {
458 return None;
459 }
460
461 let deepest_idx = self.levels.len() - 1;
462 let deepest = &self.levels[deepest_idx];
463
464 let filter_col = deepest.column(column_idx)?;
466
467 let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
469 .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
470 .collect();
471
472 let parent_count = filter_col.parent_count();
474 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
475 let mut new_offsets: Vec<u32> = vec![0];
476
477 for parent_idx in 0..parent_count {
479 let (start, end) = filter_col.range_for_parent(parent_idx);
480
481 for phys_idx in start..end {
482 if let Some(value) = filter_col.get_physical(phys_idx)
484 && predicate(&value)
485 {
486 for col_idx in 0..deepest.column_count() {
488 if let Some(col) = deepest.column(col_idx)
489 && let Some(v) = col.get_physical(phys_idx)
490 {
491 new_columns[col_idx].push_value(v);
492 }
493 }
494 new_multiplicities[parent_idx] += 1;
495 }
496 }
497
498 new_offsets.push(new_columns[0].len() as u32);
499 }
500
501 let total_remaining: usize = new_multiplicities.iter().sum();
503 if total_remaining == 0 {
504 return Some(Self::empty());
505 }
506
507 let new_factorized_cols: Vec<FactorizedVector> = new_columns
509 .into_iter()
510 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
511 .collect();
512
513 let new_level = FactorizationLevel::unflat(
514 new_factorized_cols,
515 deepest.column_names().to_vec(),
516 new_multiplicities,
517 );
518
519 let mut result = Self {
521 levels: self.levels[..deepest_idx].to_vec(),
522 logical_row_count: 0,
523 state: ChunkState::flat(0),
524 };
525 result.levels.push(new_level);
526 result.recompute_logical_row_count();
527 result.update_state();
528
529 Some(result)
530 }
531
532 #[must_use]
540 pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
541 where
542 F: Fn(&[grafeo_common::types::Value]) -> bool,
543 {
544 if self.levels.is_empty() {
545 return None;
546 }
547
548 let deepest_idx = self.levels.len() - 1;
549 let deepest = &self.levels[deepest_idx];
550 let col_count = deepest.column_count();
551
552 if col_count == 0 {
553 return None;
554 }
555
556 let first_col = deepest.column(0)?;
557 let parent_count = first_col.parent_count();
558
559 let mut new_columns: Vec<ValueVector> = (0..col_count)
561 .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
562 .collect();
563
564 let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
565 let mut new_offsets: Vec<u32> = vec![0];
566 let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
567
568 for parent_idx in 0..parent_count {
569 let (start, end) = first_col.range_for_parent(parent_idx);
570
571 for phys_idx in start..end {
572 row_values.clear();
574 for col_idx in 0..col_count {
575 if let Some(col) = deepest.column(col_idx)
576 && let Some(v) = col.get_physical(phys_idx)
577 {
578 row_values.push(v);
579 }
580 }
581
582 if predicate(&row_values) {
584 for (col_idx, v) in row_values.iter().enumerate() {
585 new_columns[col_idx].push_value(v.clone());
586 }
587 new_multiplicities[parent_idx] += 1;
588 }
589 }
590
591 new_offsets.push(new_columns[0].len() as u32);
592 }
593
594 let total: usize = new_multiplicities.iter().sum();
596 if total == 0 {
597 return Some(Self::empty());
598 }
599
600 let new_factorized_cols: Vec<FactorizedVector> = new_columns
602 .into_iter()
603 .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
604 .collect();
605
606 let new_level = FactorizationLevel::unflat(
607 new_factorized_cols,
608 deepest.column_names().to_vec(),
609 new_multiplicities,
610 );
611
612 let mut result = Self {
613 levels: self.levels[..deepest_idx].to_vec(),
614 logical_row_count: 0,
615 state: ChunkState::flat(0),
616 };
617 result.levels.push(new_level);
618 result.recompute_logical_row_count();
619 result.update_state();
620
621 Some(result)
622 }
623
624 #[must_use]
644 pub fn count_rows(&self) -> usize {
645 self.logical_row_count()
646 }
647
648 #[must_use]
665 pub fn compute_path_multiplicities(&self) -> Vec<usize> {
666 if self.levels.is_empty() {
667 return Vec::new();
668 }
669
670 if self.levels.len() == 1 {
672 return vec![1; self.levels[0].group_count];
673 }
674
675 let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
677
678 for level_idx in 1..self.levels.len() {
680 let level = &self.levels[level_idx];
681 let mut child_multiplicities = Vec::with_capacity(level.group_count);
682
683 for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
685 let child_count = if parent_idx < level.multiplicities.len() {
686 level.multiplicities[parent_idx]
687 } else {
688 0
689 };
690
691 for _ in 0..child_count {
693 child_multiplicities.push(parent_mult);
694 }
695 }
696
697 parent_multiplicities = child_multiplicities;
698 }
699
700 parent_multiplicities
701 }
702
703 #[must_use]
716 pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
717 if self.levels.is_empty() {
718 return None;
719 }
720
721 let deepest_idx = self.levels.len() - 1;
722 let deepest = &self.levels[deepest_idx];
723 let col = deepest.column(column_idx)?;
724
725 let multiplicities = self.compute_path_multiplicities();
727
728 let mut sum = 0.0;
729 for (phys_idx, mult) in multiplicities.iter().enumerate() {
730 if let Some(value) = col.get_physical(phys_idx) {
731 let num_value = match &value {
733 grafeo_common::types::Value::Int64(v) => *v as f64,
734 grafeo_common::types::Value::Float64(v) => *v,
735 _ => continue, };
737 sum += num_value * (*mult as f64);
738 }
739 }
740 Some(sum)
741 }
742
743 #[must_use]
755 pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
756 let count = self.logical_row_count();
757 if count == 0 {
758 return None;
759 }
760
761 let sum = self.sum_deepest(column_idx)?;
762 Some(sum / count as f64)
763 }
764
765 #[must_use]
778 pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
779 if self.levels.is_empty() {
780 return None;
781 }
782
783 let deepest_idx = self.levels.len() - 1;
784 let deepest = &self.levels[deepest_idx];
785 let col = deepest.column(column_idx)?;
786
787 let mut min_value: Option<grafeo_common::types::Value> = None;
788
789 for phys_idx in 0..col.physical_len() {
790 if let Some(value) = col.get_physical(phys_idx) {
791 min_value = Some(match min_value {
792 None => value,
793 Some(current) => {
794 if Self::value_less_than(&value, ¤t) {
795 value
796 } else {
797 current
798 }
799 }
800 });
801 }
802 }
803
804 min_value
805 }
806
807 #[must_use]
820 pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
821 if self.levels.is_empty() {
822 return None;
823 }
824
825 let deepest_idx = self.levels.len() - 1;
826 let deepest = &self.levels[deepest_idx];
827 let col = deepest.column(column_idx)?;
828
829 let mut max_value: Option<grafeo_common::types::Value> = None;
830
831 for phys_idx in 0..col.physical_len() {
832 if let Some(value) = col.get_physical(phys_idx) {
833 max_value = Some(match max_value {
834 None => value,
835 Some(current) => {
836 if Self::value_less_than(¤t, &value) {
837 value
838 } else {
839 current
840 }
841 }
842 });
843 }
844 }
845
846 max_value
847 }
848
849 fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
857 use grafeo_common::types::Value;
858
859 match (a, b) {
860 (Value::Null, Value::Null) => false,
862 (Value::Null, _) => true,
863 (_, Value::Null) => false,
864
865 (Value::Int64(x), Value::Int64(y)) => x < y,
867 (Value::Float64(x), Value::Float64(y)) => x < y,
868 (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
869 (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
870
871 (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
873
874 (Value::Bool(x), Value::Bool(y)) => !x && *y,
876
877 _ => false,
880 }
881 }
882
883 #[must_use]
897 pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
898 if self.levels.is_empty() || column_specs.is_empty() {
899 return Self::empty();
900 }
901
902 let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
904 for (level_idx, col_idx, name) in column_specs {
905 if *level_idx < self.levels.len() {
906 level_specs[*level_idx].push((*col_idx, name.clone()));
907 }
908 }
909
910 let mut new_levels = Vec::new();
912
913 for (level_idx, specs) in level_specs.iter().enumerate() {
914 if specs.is_empty() {
915 continue;
916 }
917
918 let src_level = &self.levels[level_idx];
919
920 let columns: Vec<FactorizedVector> = specs
921 .iter()
922 .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
923 .collect();
924
925 let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
926
927 if level_idx == 0 {
928 new_levels.push(FactorizationLevel::flat(columns, names));
929 } else {
930 let mults = src_level.multiplicities().to_vec();
931 new_levels.push(FactorizationLevel::unflat(columns, names, mults));
932 }
933 }
934
935 if new_levels.is_empty() {
936 return Self::empty();
937 }
938
939 let mut result = Self {
940 levels: new_levels,
941 logical_row_count: 0,
942 state: ChunkState::flat(0),
943 };
944 result.recompute_logical_row_count();
945 result.update_state();
946 result
947 }
948}
949
950pub struct FactorizedRowIterator<'a> {
965 chunk: &'a FactorizedChunk,
966 indices: Vec<usize>,
968 exhausted: bool,
971}
972
973impl<'a> FactorizedRowIterator<'a> {
974 fn new(chunk: &'a FactorizedChunk) -> Self {
975 let indices = vec![0; chunk.level_count()];
976 let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
977
978 let mut iter = Self {
979 chunk,
980 indices,
981 exhausted,
982 };
983
984 if !exhausted && !iter.has_valid_deepest_range() {
986 if !iter.advance() {
987 exhausted = true;
988 }
989 iter.exhausted = exhausted;
990 }
991
992 iter
993 }
994
995 fn advance(&mut self) -> bool {
997 if self.exhausted || self.chunk.levels.is_empty() {
998 return false;
999 }
1000
1001 for level_idx in (0..self.chunk.levels.len()).rev() {
1003 let level = &self.chunk.levels[level_idx];
1004
1005 let parent_idx = if level_idx == 0 {
1007 self.indices[0] + 1
1009 } else {
1010 self.indices[level_idx - 1]
1012 };
1013
1014 let (_start, end) = if level_idx == 0 {
1016 (0, level.group_count)
1017 } else {
1018 if let Some(col) = level.columns.first() {
1020 col.range_for_parent(parent_idx)
1021 } else {
1022 (0, 0)
1023 }
1024 };
1025
1026 let current = self.indices[level_idx];
1027 if current + 1 < end {
1028 self.indices[level_idx] = current + 1;
1030 for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1032 if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1033 let (deeper_start, _) =
1034 deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1035 self.indices[deeper_idx] = deeper_start;
1036 }
1037 }
1038
1039 if self.has_valid_deepest_range() {
1042 return true;
1043 }
1044 return self.advance();
1047 }
1048 }
1050
1051 self.exhausted = true;
1053 false
1054 }
1055
1056 fn has_valid_deepest_range(&self) -> bool {
1062 if self.chunk.levels.len() <= 1 {
1063 return true; }
1065
1066 for level_idx in 1..self.chunk.levels.len() {
1068 let parent_idx = self.indices[level_idx - 1];
1069 if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1070 let (start, end) = col.range_for_parent(parent_idx);
1071 if start >= end {
1072 return false;
1073 }
1074 } else {
1075 return false;
1076 }
1077 }
1078
1079 true
1080 }
1081}
1082
1083impl Iterator for FactorizedRowIterator<'_> {
1084 type Item = Vec<usize>;
1085
1086 fn next(&mut self) -> Option<Self::Item> {
1087 if self.exhausted {
1088 return None;
1089 }
1090
1091 let result = self.indices.clone();
1093 self.advance();
1094 Some(result)
1095 }
1096}
1097
1098#[derive(Debug, Clone)]
1100pub enum ChunkVariant {
1101 Flat(DataChunk),
1103 Factorized(FactorizedChunk),
1105}
1106
1107impl ChunkVariant {
1108 #[must_use]
1110 pub fn flat(chunk: DataChunk) -> Self {
1111 Self::Flat(chunk)
1112 }
1113
1114 #[must_use]
1116 pub fn factorized(chunk: FactorizedChunk) -> Self {
1117 Self::Factorized(chunk)
1118 }
1119
1120 #[must_use]
1122 pub fn ensure_flat(self) -> DataChunk {
1123 match self {
1124 Self::Flat(chunk) => chunk,
1125 Self::Factorized(chunk) => chunk.flatten(),
1126 }
1127 }
1128
1129 #[must_use]
1131 pub fn logical_row_count(&self) -> usize {
1132 match self {
1133 Self::Flat(chunk) => chunk.row_count(),
1134 Self::Factorized(chunk) => chunk.logical_row_count(),
1135 }
1136 }
1137
1138 #[must_use]
1140 pub fn is_factorized(&self) -> bool {
1141 matches!(self, Self::Factorized(_))
1142 }
1143
1144 #[must_use]
1146 pub fn is_flat(&self) -> bool {
1147 matches!(self, Self::Flat(_))
1148 }
1149
1150 #[must_use]
1152 pub fn is_empty(&self) -> bool {
1153 self.logical_row_count() == 0
1154 }
1155}
1156
1157impl From<DataChunk> for ChunkVariant {
1158 fn from(chunk: DataChunk) -> Self {
1159 Self::Flat(chunk)
1160 }
1161}
1162
1163impl From<FactorizedChunk> for ChunkVariant {
1164 fn from(chunk: FactorizedChunk) -> Self {
1165 Self::Factorized(chunk)
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use grafeo_common::types::{LogicalType, NodeId, Value};
1172
1173 use super::*;
1174
1175 fn make_flat_chunk() -> DataChunk {
1176 let mut col = ValueVector::with_type(LogicalType::Int64);
1177 col.push_int64(1);
1178 col.push_int64(2);
1179 DataChunk::new(vec![col])
1180 }
1181
1182 fn create_multi_level_chunk() -> FactorizedChunk {
1183 let mut sources = ValueVector::with_type(LogicalType::Int64);
1185 sources.push_int64(10);
1186 sources.push_int64(20);
1187
1188 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1189
1190 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1191 neighbors.push_int64(1);
1192 neighbors.push_int64(2);
1193 neighbors.push_int64(3);
1194 neighbors.push_int64(4);
1195
1196 let offsets = vec![0, 2, 4];
1197 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1198 chunk
1199 }
1200
1201 #[test]
1202 fn test_from_flat() {
1203 let flat = make_flat_chunk();
1204 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1205
1206 assert_eq!(factorized.level_count(), 1);
1207 assert_eq!(factorized.logical_row_count(), 2);
1208 assert_eq!(factorized.physical_size(), 2);
1209 }
1210
1211 #[test]
1212 fn test_add_level() {
1213 let mut col0 = ValueVector::with_type(LogicalType::Node);
1215 col0.push_node_id(NodeId::new(100));
1216 col0.push_node_id(NodeId::new(200));
1217
1218 let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1219
1220 assert_eq!(chunk.level_count(), 1);
1221 assert_eq!(chunk.logical_row_count(), 2);
1222
1223 let mut neighbors = ValueVector::with_type(LogicalType::Node);
1225 neighbors.push_node_id(NodeId::new(10));
1226 neighbors.push_node_id(NodeId::new(11));
1227 neighbors.push_node_id(NodeId::new(12));
1228 neighbors.push_node_id(NodeId::new(20));
1229 neighbors.push_node_id(NodeId::new(21));
1230
1231 let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1233
1234 assert_eq!(chunk.level_count(), 2);
1235 assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
1238
1239 #[test]
1240 fn test_flatten_single_level() {
1241 let flat = make_flat_chunk();
1242 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1243
1244 let flattened = factorized.flatten();
1245 assert_eq!(flattened.row_count(), 2);
1246 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1247 assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1248 }
1249
1250 #[test]
1251 fn test_flatten_multi_level() {
1252 let mut sources = ValueVector::with_type(LogicalType::Int64);
1254 sources.push_int64(1);
1255 sources.push_int64(2);
1256
1257 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1258
1259 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1260 neighbors.push_int64(10);
1261 neighbors.push_int64(11);
1262 neighbors.push_int64(20);
1263 neighbors.push_int64(21);
1264
1265 let offsets = vec![0, 2, 4];
1266 chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1267
1268 let flat = chunk.flatten();
1269 assert_eq!(flat.row_count(), 4);
1270 assert_eq!(flat.column_count(), 2);
1271
1272 assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1275 assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1276 assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1277 assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1278 assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1279 assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1280 assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1281 assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1282 }
1283
1284 #[test]
1285 fn test_logical_row_iter_single_level() {
1286 let flat = make_flat_chunk();
1287 let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1288
1289 let indices: Vec<_> = factorized.logical_row_iter().collect();
1290 assert_eq!(indices.len(), 2);
1291 assert_eq!(indices[0], vec![0]);
1292 assert_eq!(indices[1], vec![1]);
1293 }
1294
1295 #[test]
1296 fn test_chunk_variant() {
1297 let flat = make_flat_chunk();
1298 let variant = ChunkVariant::flat(flat.clone());
1299
1300 assert!(variant.is_flat());
1301 assert!(!variant.is_factorized());
1302 assert_eq!(variant.logical_row_count(), 2);
1303
1304 let ensured = variant.ensure_flat();
1305 assert_eq!(ensured.row_count(), 2);
1306 }
1307
1308 #[test]
1309 fn test_chunk_variant_factorized() {
1310 let chunk = create_multi_level_chunk();
1311 let variant = ChunkVariant::factorized(chunk);
1312
1313 assert!(variant.is_factorized());
1314 assert!(!variant.is_flat());
1315 assert_eq!(variant.logical_row_count(), 4);
1316
1317 let flat = variant.ensure_flat();
1318 assert_eq!(flat.row_count(), 4);
1319 }
1320
1321 #[test]
1322 fn test_chunk_variant_from() {
1323 let flat = make_flat_chunk();
1324 let variant: ChunkVariant = flat.into();
1325 assert!(variant.is_flat());
1326
1327 let factorized = create_multi_level_chunk();
1328 let variant2: ChunkVariant = factorized.into();
1329 assert!(variant2.is_factorized());
1330 }
1331
1332 #[test]
1333 fn test_chunk_variant_is_empty() {
1334 let empty_flat = DataChunk::empty();
1335 let variant = ChunkVariant::flat(empty_flat);
1336 assert!(variant.is_empty());
1337
1338 let non_empty = make_flat_chunk();
1339 let variant2 = ChunkVariant::flat(non_empty);
1340 assert!(!variant2.is_empty());
1341 }
1342
1343 #[test]
1344 fn test_empty_chunk() {
1345 let chunk = FactorizedChunk::empty();
1346 assert_eq!(chunk.level_count(), 0);
1347 assert_eq!(chunk.logical_row_count(), 0);
1348 assert_eq!(chunk.physical_size(), 0);
1349
1350 let flat = chunk.flatten();
1351 assert!(flat.is_empty());
1352 }
1353
1354 #[test]
1355 fn test_all_column_names() {
1356 let mut sources = ValueVector::with_type(LogicalType::Int64);
1357 sources.push_int64(1);
1358
1359 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1360
1361 let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1362 neighbors.push_int64(10);
1363
1364 chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1365
1366 let names = chunk.all_column_names();
1367 assert_eq!(names, vec!["source", "neighbor"]);
1368 }
1369
1370 #[test]
1371 fn test_level_mut() {
1372 let mut chunk = create_multi_level_chunk();
1373
1374 let level = chunk.level_mut(0).unwrap();
1376 assert_eq!(level.column_count(), 1);
1377
1378 assert!(chunk.level_mut(10).is_none());
1380 }
1381
1382 #[test]
1383 fn test_factorization_level_column_mut() {
1384 let mut chunk = create_multi_level_chunk();
1385
1386 let level = chunk.level_mut(0).unwrap();
1387 let col = level.column_mut(0);
1388 assert!(col.is_some());
1389
1390 assert!(level.column_mut(10).is_none());
1392 }
1393
1394 #[test]
1395 fn test_factorization_level_physical_value_count() {
1396 let chunk = create_multi_level_chunk();
1397
1398 let level0 = chunk.level(0).unwrap();
1399 assert_eq!(level0.physical_value_count(), 2); let level1 = chunk.level(1).unwrap();
1402 assert_eq!(level1.physical_value_count(), 4); }
1404
1405 #[test]
1406 fn test_count_rows() {
1407 let chunk = create_multi_level_chunk();
1408 assert_eq!(chunk.count_rows(), 4);
1409
1410 let empty = FactorizedChunk::empty();
1411 assert_eq!(empty.count_rows(), 0);
1412 }
1413
1414 #[test]
1415 fn test_compute_path_multiplicities() {
1416 let chunk = create_multi_level_chunk();
1417
1418 let mults = chunk.compute_path_multiplicities();
1419 assert_eq!(mults.len(), 4);
1421 assert!(mults.iter().all(|&m| m == 1));
1422 }
1423
1424 #[test]
1425 fn test_compute_path_multiplicities_single_level() {
1426 let mut col = ValueVector::with_type(LogicalType::Int64);
1427 col.push_int64(1);
1428 col.push_int64(2);
1429 col.push_int64(3);
1430
1431 let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1432 let mults = chunk.compute_path_multiplicities();
1433
1434 assert_eq!(mults.len(), 3);
1436 assert!(mults.iter().all(|&m| m == 1));
1437 }
1438
1439 #[test]
1440 fn test_compute_path_multiplicities_empty() {
1441 let chunk = FactorizedChunk::empty();
1442 let mults = chunk.compute_path_multiplicities();
1443 assert!(mults.is_empty());
1444 }
1445
1446 #[test]
1447 fn test_path_multiplicities_cached() {
1448 let mut chunk = create_multi_level_chunk();
1449
1450 let mults1 = chunk.path_multiplicities_cached();
1452 assert_eq!(mults1.len(), 4);
1453
1454 let mults2 = chunk.path_multiplicities_cached();
1456 assert_eq!(mults1.len(), mults2.len());
1457 }
1458
1459 #[test]
1460 fn test_sum_deepest() {
1461 let chunk = create_multi_level_chunk();
1462
1463 let sum = chunk.sum_deepest(0);
1465 assert_eq!(sum, Some(10.0)); }
1467
1468 #[test]
1469 fn test_sum_deepest_empty() {
1470 let chunk = FactorizedChunk::empty();
1471 assert!(chunk.sum_deepest(0).is_none());
1472 }
1473
1474 #[test]
1475 fn test_sum_deepest_invalid_column() {
1476 let chunk = create_multi_level_chunk();
1477 assert!(chunk.sum_deepest(10).is_none());
1478 }
1479
1480 #[test]
1481 fn test_avg_deepest() {
1482 let chunk = create_multi_level_chunk();
1483
1484 let avg = chunk.avg_deepest(0);
1486 assert_eq!(avg, Some(2.5));
1487 }
1488
1489 #[test]
1490 fn test_avg_deepest_empty() {
1491 let chunk = FactorizedChunk::empty();
1492 assert!(chunk.avg_deepest(0).is_none());
1493 }
1494
1495 #[test]
1496 fn test_min_deepest() {
1497 let chunk = create_multi_level_chunk();
1498
1499 let min = chunk.min_deepest(0);
1500 assert_eq!(min, Some(Value::Int64(1)));
1501 }
1502
1503 #[test]
1504 fn test_min_deepest_empty() {
1505 let chunk = FactorizedChunk::empty();
1506 assert!(chunk.min_deepest(0).is_none());
1507 }
1508
1509 #[test]
1510 fn test_min_deepest_invalid_column() {
1511 let chunk = create_multi_level_chunk();
1512 assert!(chunk.min_deepest(10).is_none());
1513 }
1514
1515 #[test]
1516 fn test_max_deepest() {
1517 let chunk = create_multi_level_chunk();
1518
1519 let max = chunk.max_deepest(0);
1520 assert_eq!(max, Some(Value::Int64(4)));
1521 }
1522
1523 #[test]
1524 fn test_max_deepest_empty() {
1525 let chunk = FactorizedChunk::empty();
1526 assert!(chunk.max_deepest(0).is_none());
1527 }
1528
1529 #[test]
1530 fn test_value_less_than() {
1531 assert!(FactorizedChunk::value_less_than(
1533 &Value::Null,
1534 &Value::Int64(1)
1535 ));
1536 assert!(!FactorizedChunk::value_less_than(
1537 &Value::Int64(1),
1538 &Value::Null
1539 ));
1540 assert!(!FactorizedChunk::value_less_than(
1541 &Value::Null,
1542 &Value::Null
1543 ));
1544
1545 assert!(FactorizedChunk::value_less_than(
1547 &Value::Int64(1),
1548 &Value::Int64(2)
1549 ));
1550 assert!(!FactorizedChunk::value_less_than(
1551 &Value::Int64(2),
1552 &Value::Int64(1)
1553 ));
1554
1555 assert!(FactorizedChunk::value_less_than(
1557 &Value::Float64(1.5),
1558 &Value::Float64(2.5)
1559 ));
1560
1561 assert!(FactorizedChunk::value_less_than(
1563 &Value::Int64(1),
1564 &Value::Float64(1.5)
1565 ));
1566 assert!(FactorizedChunk::value_less_than(
1567 &Value::Float64(0.5),
1568 &Value::Int64(1)
1569 ));
1570
1571 assert!(FactorizedChunk::value_less_than(
1573 &Value::String("apple".into()),
1574 &Value::String("banana".into())
1575 ));
1576
1577 assert!(FactorizedChunk::value_less_than(
1579 &Value::Bool(false),
1580 &Value::Bool(true)
1581 ));
1582 assert!(!FactorizedChunk::value_less_than(
1583 &Value::Bool(true),
1584 &Value::Bool(false)
1585 ));
1586
1587 assert!(!FactorizedChunk::value_less_than(
1589 &Value::Int64(1),
1590 &Value::String("hello".into())
1591 ));
1592 }
1593
1594 #[test]
1595 fn test_filter_deepest() {
1596 let chunk = create_multi_level_chunk();
1597
1598 let filtered = chunk.filter_deepest(0, |v| {
1600 if let Value::Int64(n) = v {
1601 *n > 2
1602 } else {
1603 false
1604 }
1605 });
1606
1607 let filtered = filtered.unwrap();
1608 assert_eq!(filtered.logical_row_count(), 2); }
1610
1611 #[test]
1612 fn test_filter_deepest_empty() {
1613 let chunk = FactorizedChunk::empty();
1614 assert!(chunk.filter_deepest(0, |_| true).is_none());
1615 }
1616
1617 #[test]
1618 fn test_filter_deepest_all_filtered() {
1619 let chunk = create_multi_level_chunk();
1620
1621 let filtered = chunk.filter_deepest(0, |_| false);
1623
1624 let filtered = filtered.unwrap();
1625 assert_eq!(filtered.logical_row_count(), 0);
1626 }
1627
1628 #[test]
1629 fn test_filter_deepest_invalid_column() {
1630 let chunk = create_multi_level_chunk();
1631 assert!(chunk.filter_deepest(10, |_| true).is_none());
1632 }
1633
1634 #[test]
1635 fn test_filter_deepest_multi() {
1636 let mut sources = ValueVector::with_type(LogicalType::Int64);
1638 sources.push_int64(1);
1639
1640 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1641
1642 let mut col1 = ValueVector::with_type(LogicalType::Int64);
1643 col1.push_int64(10);
1644 col1.push_int64(20);
1645 col1.push_int64(30);
1646
1647 let mut col2 = ValueVector::with_type(LogicalType::Int64);
1648 col2.push_int64(1);
1649 col2.push_int64(2);
1650 col2.push_int64(3);
1651
1652 let offsets = vec![0, 3];
1653 chunk.add_level(
1654 vec![col1, col2],
1655 vec!["a".to_string(), "b".to_string()],
1656 &offsets,
1657 );
1658
1659 let filtered = chunk.filter_deepest_multi(|values| {
1661 if values.len() == 2
1662 && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1663 {
1664 return *a + *b > 15;
1665 }
1666 false
1667 });
1668
1669 assert!(filtered.is_some());
1670 let filtered = filtered.unwrap();
1671 assert_eq!(filtered.logical_row_count(), 2); }
1673
1674 #[test]
1675 fn test_filter_deepest_multi_empty() {
1676 let chunk = FactorizedChunk::empty();
1677 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1678 }
1679
1680 #[test]
1681 fn test_filter_deepest_multi_no_columns() {
1682 let mut sources = ValueVector::with_type(LogicalType::Int64);
1684 sources.push_int64(1);
1685
1686 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1687
1688 let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1690 chunk.add_factorized_level(empty_level);
1691
1692 assert!(chunk.filter_deepest_multi(|_| true).is_none());
1693 }
1694
1695 #[test]
1696 fn test_project() {
1697 let mut sources = ValueVector::with_type(LogicalType::Int64);
1698 sources.push_int64(1);
1699 sources.push_int64(2);
1700
1701 let mut col2 = ValueVector::with_type(LogicalType::String);
1702 col2.push_string("a");
1703 col2.push_string("b");
1704
1705 let chunk = FactorizedChunk::with_flat_level(
1706 vec![sources, col2],
1707 vec!["num".to_string(), "str".to_string()],
1708 );
1709
1710 let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1712
1713 assert_eq!(projected.total_column_count(), 1);
1714 let names = projected.all_column_names();
1715 assert_eq!(names, vec!["projected_num"]);
1716 }
1717
1718 #[test]
1719 fn test_project_empty() {
1720 let chunk = FactorizedChunk::empty();
1721 let projected = chunk.project(&[(0, 0, "col".to_string())]);
1722 assert_eq!(projected.level_count(), 0);
1723 }
1724
1725 #[test]
1726 fn test_project_empty_specs() {
1727 let chunk = create_multi_level_chunk();
1728 let projected = chunk.project(&[]);
1729 assert_eq!(projected.level_count(), 0);
1730 }
1731
1732 #[test]
1733 fn test_project_invalid_level() {
1734 let chunk = create_multi_level_chunk();
1735
1736 let projected = chunk.project(&[(10, 0, "col".to_string())]);
1738 assert_eq!(projected.level_count(), 0);
1739 }
1740
1741 #[test]
1742 fn test_project_multi_level() {
1743 let chunk = create_multi_level_chunk();
1744
1745 let projected =
1747 chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1748
1749 assert_eq!(projected.level_count(), 2);
1750 assert_eq!(projected.total_column_count(), 2);
1751 }
1752
1753 #[test]
1754 fn test_total_column_count() {
1755 let chunk = create_multi_level_chunk();
1756 assert_eq!(chunk.total_column_count(), 2); }
1758
1759 #[test]
1760 fn test_chunk_state_access() {
1761 let mut chunk = create_multi_level_chunk();
1762
1763 let state = chunk.chunk_state();
1764 assert!(state.is_factorized());
1765
1766 let state_mut = chunk.chunk_state_mut();
1767 state_mut.invalidate_cache();
1768 }
1769
1770 #[test]
1771 fn test_logical_row_iter_multi_level() {
1772 let chunk = create_multi_level_chunk();
1773
1774 let indices: Vec<_> = chunk.logical_row_iter().collect();
1775 assert_eq!(indices.len(), 4);
1776
1777 assert_eq!(indices[0], vec![0, 0]);
1779 assert_eq!(indices[1], vec![0, 1]);
1780 assert_eq!(indices[2], vec![1, 2]);
1781 assert_eq!(indices[3], vec![1, 3]);
1782 }
1783
1784 #[test]
1785 fn test_sum_deepest_with_float() {
1786 let mut sources = ValueVector::with_type(LogicalType::Int64);
1787 sources.push_int64(1);
1788
1789 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1790
1791 let mut floats = ValueVector::with_type(LogicalType::Float64);
1792 floats.push_float64(1.5);
1793 floats.push_float64(2.5);
1794 floats.push_float64(3.0);
1795
1796 chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1797
1798 let sum = chunk.sum_deepest(0);
1799 assert_eq!(sum, Some(7.0)); }
1801
1802 #[test]
1803 fn test_min_max_with_strings() {
1804 let mut sources = ValueVector::with_type(LogicalType::Int64);
1805 sources.push_int64(1);
1806
1807 let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1808
1809 let mut strings = ValueVector::with_type(LogicalType::String);
1810 strings.push_string("banana");
1811 strings.push_string("apple");
1812 strings.push_string("cherry");
1813
1814 chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1815
1816 let min = chunk.min_deepest(0);
1817 assert_eq!(min, Some(Value::String("apple".into())));
1818
1819 let max = chunk.max_deepest(0);
1820 assert_eq!(max, Some(Value::String("cherry".into())));
1821 }
1822
1823 #[test]
1824 fn test_recompute_logical_row_count_empty() {
1825 let mut chunk = FactorizedChunk::empty();
1826 chunk.recompute_logical_row_count();
1827 assert_eq!(chunk.logical_row_count(), 0);
1828 }
1829
1830 #[test]
1831 fn test_factorization_level_group_count() {
1832 let chunk = create_multi_level_chunk();
1833
1834 let level0 = chunk.level(0).unwrap();
1835 assert_eq!(level0.group_count(), 2);
1836
1837 let level1 = chunk.level(1).unwrap();
1838 assert_eq!(level1.group_count(), 4);
1839 }
1840
1841 #[test]
1842 fn test_factorization_level_multiplicities() {
1843 let chunk = create_multi_level_chunk();
1844
1845 let level1 = chunk.level(1).unwrap();
1846 let mults = level1.multiplicities();
1847 assert_eq!(mults, &[2, 2]); }
1849
1850 #[test]
1851 fn test_factorization_level_column_names() {
1852 let chunk = create_multi_level_chunk();
1853
1854 let level0 = chunk.level(0).unwrap();
1855 assert_eq!(level0.column_names(), &["src"]);
1856
1857 let level1 = chunk.level(1).unwrap();
1858 assert_eq!(level1.column_names(), &["nbr"]);
1859 }
1860}