Skip to main content

grafeo_core/execution/
factorized_chunk.rs

1//! FactorizedChunk - multi-level factorized data representation.
2//!
3//! A `FactorizedChunk` organizes columns into levels, where each level can have
4//! different factorization (multiplicity). This avoids materializing the full
5//! Cartesian product during multi-hop graph traversals.
6//!
7//! # Example
8//!
9//! For a 2-hop query `MATCH (a)-[e1]->(b)-[e2]->(c)`:
10//!
11//! ```text
12//! Level 0 (flat):   [a1, a2]           (2 source nodes)
13//! Level 1 (unflat): [b1, b2, b3, b4]   (4 first-hop neighbors)
14//!                   offsets: [0, 2, 4]  (a1 has 2 neighbors, a2 has 2)
15//! Level 2 (unflat): [c1, c2, ..., c8]  (8 second-hop neighbors)
16//!                   offsets: [0, 2, 4, 6, 8]
17//!
18//! Logical rows = 2 * 2 * 2 = 8, but physical storage = 2 + 4 + 8 = 14 values
19//! vs flat storage = 8 * 5 columns = 40 values
20//! ```
21
22use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29/// A chunk that supports factorized representation across multiple levels.
30///
31/// Columns are organized in groups by their factorization level:
32/// - Level 0 (flat): Base columns, one value per logical row
33/// - Level 1 (unflat): First expansion, grouped by level 0
34/// - Level 2 (unflat): Second expansion, grouped by level 1
35/// - And so on...
36///
37/// # State Management
38///
39/// The chunk maintains a [`ChunkState`] that provides:
40/// - Cached multiplicities for O(1) aggregate access
41/// - Selection vector support for lazy filtering
42/// - Generation tracking for cache invalidation
43#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45    /// Column groups organized by factorization level.
46    levels: Vec<FactorizationLevel>,
47    /// Total logical row count (product of all multiplicities).
48    logical_row_count: usize,
49    /// Unified state tracking (caching, selection, etc.).
50    state: ChunkState,
51}
52
53/// A factorization level containing columns at the same nesting depth.
54#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56    /// Columns at this level.
57    columns: Vec<FactorizedVector>,
58    /// Column names or identifiers (for schema mapping).
59    column_names: Vec<String>,
60    /// Number of groups at this level.
61    group_count: usize,
62    /// Multiplicities for each group (how many children per parent).
63    /// For level 0, this is vec![1; group_count].
64    /// For level N, multiplicities[i] = number of values for parent i.
65    multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69    /// Creates a new flat level (level 0) from columns.
70    #[must_use]
71    pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72        let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73        let multiplicities = vec![1; group_count];
74        Self {
75            columns,
76            column_names,
77            group_count,
78            multiplicities,
79        }
80    }
81
82    /// Creates a new unflat level with the given multiplicities.
83    ///
84    /// Note: `multiplicities[i]` is the number of values for parent i.
85    /// The total number of values (group_count) is the sum of all multiplicities.
86    #[must_use]
87    pub fn unflat(
88        columns: Vec<FactorizedVector>,
89        column_names: Vec<String>,
90        multiplicities: Vec<usize>,
91    ) -> Self {
92        // group_count is the total number of values at this level (sum of multiplicities)
93        let group_count = multiplicities.iter().sum();
94        Self {
95            columns,
96            column_names,
97            group_count,
98            multiplicities,
99        }
100    }
101
102    /// Returns the number of columns at this level.
103    #[must_use]
104    pub fn column_count(&self) -> usize {
105        self.columns.len()
106    }
107
108    /// Returns the number of groups at this level.
109    #[must_use]
110    pub fn group_count(&self) -> usize {
111        self.group_count
112    }
113
114    /// Returns the total physical value count across all columns.
115    #[must_use]
116    pub fn physical_value_count(&self) -> usize {
117        self.columns
118            .iter()
119            .map(FactorizedVector::physical_len)
120            .sum()
121    }
122
123    /// Returns the multiplicities for this level.
124    #[must_use]
125    pub fn multiplicities(&self) -> &[usize] {
126        &self.multiplicities
127    }
128
129    /// Returns a column by index.
130    #[must_use]
131    pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132        self.columns.get(index)
133    }
134
135    /// Returns a mutable column by index.
136    pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137        self.columns.get_mut(index)
138    }
139
140    /// Returns the column names.
141    #[must_use]
142    pub fn column_names(&self) -> &[String] {
143        &self.column_names
144    }
145}
146
147impl FactorizedChunk {
148    /// Creates an empty factorized chunk.
149    #[must_use]
150    pub fn empty() -> Self {
151        Self {
152            levels: Vec::new(),
153            logical_row_count: 0,
154            state: ChunkState::flat(0),
155        }
156    }
157
158    /// Creates a factorized chunk from a flat `DataChunk`.
159    ///
160    /// The resulting chunk has a single level (level 0) with all columns flat.
161    #[must_use]
162    pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163        let columns: Vec<FactorizedVector> = chunk
164            .columns()
165            .iter()
166            .map(|c| FactorizedVector::flat(c.clone()))
167            .collect();
168
169        let row_count = chunk.row_count();
170        let level = FactorizationLevel::flat(columns, column_names);
171
172        Self {
173            levels: vec![level],
174            logical_row_count: row_count,
175            state: ChunkState::unflat(1, row_count),
176        }
177    }
178
179    /// Creates a factorized chunk with a single flat level.
180    #[must_use]
181    pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182        let row_count = columns.first().map_or(0, ValueVector::len);
183        let factorized_columns: Vec<FactorizedVector> =
184            columns.into_iter().map(FactorizedVector::flat).collect();
185
186        let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188        Self {
189            levels: vec![level],
190            logical_row_count: row_count,
191            state: ChunkState::unflat(1, row_count),
192        }
193    }
194
195    /// Returns the number of factorization levels.
196    #[must_use]
197    pub fn level_count(&self) -> usize {
198        self.levels.len()
199    }
200
201    /// Returns the logical row count (full Cartesian product size).
202    #[must_use]
203    pub fn logical_row_count(&self) -> usize {
204        self.logical_row_count
205    }
206
207    /// Returns the physical storage size (actual values stored).
208    #[must_use]
209    pub fn physical_size(&self) -> usize {
210        self.levels
211            .iter()
212            .map(FactorizationLevel::physical_value_count)
213            .sum()
214    }
215
216    /// Returns the chunk state.
217    #[must_use]
218    pub fn chunk_state(&self) -> &ChunkState {
219        &self.state
220    }
221
222    /// Returns mutable access to the chunk state.
223    pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224        &mut self.state
225    }
226
227    /// Returns path multiplicities, computing once and caching.
228    ///
229    /// This is the key optimization for aggregation: multiplicities are
230    /// computed once and reused for all aggregates (COUNT, SUM, AVG, etc.).
231    ///
232    /// # Example
233    ///
234    /// ```no_run
235    /// # use grafeo_core::execution::factorized_chunk::FactorizedChunk;
236    /// # let mut chunk = FactorizedChunk::empty();
237    /// let mults = chunk.path_multiplicities_cached();
238    /// let sum = chunk.sum_deepest(0);
239    /// let avg = chunk.avg_deepest(0);
240    /// ```
241    pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
242        // Check if already cached
243        if let Some(cached) = self.state.cached_multiplicities() {
244            return Arc::clone(cached);
245        }
246
247        // Compute and cache
248        let mults = self.compute_path_multiplicities();
249        let arc_mults: Arc<[usize]> = mults.into();
250        self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
251        arc_mults
252    }
253
254    /// Returns a level by index.
255    #[must_use]
256    pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
257        self.levels.get(index)
258    }
259
260    /// Returns a mutable level by index.
261    pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
262        self.levels.get_mut(index)
263    }
264
265    /// Adds a new factorization level for expansion results.
266    ///
267    /// The new level's multiplicities determine how many values each parent
268    /// in the previous level expands to.
269    ///
270    /// # Arguments
271    ///
272    /// * `columns` - Columns at the new level
273    /// * `column_names` - Names for the new columns
274    /// * `offsets` - Offset array where `offsets[i]` is the start index for parent `i`
275    pub fn add_level(
276        &mut self,
277        columns: Vec<ValueVector>,
278        column_names: Vec<String>,
279        offsets: &[u32],
280    ) {
281        let parent_count = offsets.len().saturating_sub(1);
282
283        // Compute multiplicities from offsets
284        let multiplicities: Vec<usize> = (0..parent_count)
285            .map(|i| (offsets[i + 1] - offsets[i]) as usize)
286            .collect();
287
288        // Create unflat factorized vectors
289        let factorized_columns: Vec<FactorizedVector> = columns
290            .into_iter()
291            .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
292            .collect();
293
294        let level =
295            FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
296        self.levels.push(level);
297
298        // Update logical row count
299        // New count = previous count * sum of new multiplicities / parent_count
300        // Actually: each parent's contribution is multiplied by its multiplicity
301        if self.levels.len() == 1 {
302            // First level - logical count is just the sum of multiplicities (or total values)
303            self.logical_row_count = multiplicities.iter().sum();
304        } else {
305            // For subsequent levels, we need to compute based on parent multiplicities
306            self.recompute_logical_row_count();
307        }
308
309        // Update state (invalidates cached multiplicities)
310        self.update_state();
311    }
312
313    /// Adds a level with pre-computed factorized vectors.
314    pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
315        self.levels.push(level);
316        self.recompute_logical_row_count();
317        self.update_state();
318    }
319
320    /// Updates the ChunkState to reflect current structure.
321    fn update_state(&mut self) {
322        self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
323    }
324
325    /// Recomputes the logical row count from all levels.
326    fn recompute_logical_row_count(&mut self) {
327        if self.levels.is_empty() {
328            self.logical_row_count = 0;
329            return;
330        }
331
332        // Start with level 0 count
333        let level0_count = self.levels[0].group_count;
334        if self.levels.len() == 1 {
335            self.logical_row_count = level0_count;
336            return;
337        }
338
339        // For multi-level: compute recursively
340        // Each parent at level N-1 contributes its multiplicity to level N
341        let mut counts = vec![1usize; level0_count];
342
343        for level_idx in 1..self.levels.len() {
344            let level = &self.levels[level_idx];
345            let mut new_counts = Vec::with_capacity(counts.len() * 2); // ~2x expansion
346
347            for (parent_idx, &parent_count) in counts.iter().enumerate() {
348                // This parent expands to level.multiplicities[parent_idx] children
349                if parent_idx < level.multiplicities.len() {
350                    let child_mult = level.multiplicities[parent_idx];
351                    for _ in 0..child_mult {
352                        new_counts.push(parent_count);
353                    }
354                }
355            }
356
357            counts = new_counts;
358        }
359
360        self.logical_row_count = counts.len();
361    }
362
363    /// Flattens to a regular `DataChunk` (materializes the Cartesian product).
364    ///
365    /// All levels are expanded into flat rows.
366    #[must_use]
367    pub fn flatten(&self) -> DataChunk {
368        if self.levels.is_empty() {
369            return DataChunk::empty();
370        }
371
372        // Collect all column types across all levels
373        let mut all_columns: Vec<ValueVector> = Vec::new();
374
375        // For a single level, just flatten each column
376        if self.levels.len() == 1 {
377            let level = &self.levels[0];
378            for col in &level.columns {
379                all_columns.push(col.flatten(None));
380            }
381            return DataChunk::new(all_columns);
382        }
383
384        // Multi-level: need to expand according to multiplicities
385        // Build column data by iterating through logical rows
386        let row_iter = self.logical_row_iter();
387        let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
388
389        // Pre-allocate output columns
390        let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
391        for level in &self.levels {
392            for col in &level.columns {
393                output_columns.push(ValueVector::with_capacity(
394                    col.data_type(),
395                    self.logical_row_count,
396                ));
397            }
398        }
399
400        // Iterate through all logical rows
401        for indices in row_iter {
402            let mut col_offset = 0;
403            for (level_idx, level) in self.levels.iter().enumerate() {
404                let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
405                for (col_idx, col) in level.columns.iter().enumerate() {
406                    if let Some(value) = col.get_physical(level_idx_value) {
407                        output_columns[col_offset + col_idx].push_value(value);
408                    }
409                }
410                col_offset += level.column_count();
411            }
412        }
413
414        DataChunk::new(output_columns)
415    }
416
417    /// Returns an iterator over logical rows without materializing.
418    ///
419    /// Each iteration yields a vector of physical indices, one per level.
420    pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
421        FactorizedRowIterator::new(self)
422    }
423
424    /// Gets the total number of columns across all levels.
425    #[must_use]
426    pub fn total_column_count(&self) -> usize {
427        self.levels.iter().map(|l| l.column_count()).sum()
428    }
429
430    /// Gets all column names in order across all levels.
431    #[must_use]
432    pub fn all_column_names(&self) -> Vec<String> {
433        self.levels
434            .iter()
435            .flat_map(|l| l.column_names.iter().cloned())
436            .collect()
437    }
438
439    /// Filters the deepest level in-place using a predicate on column values.
440    ///
441    /// This is the key optimization: instead of flattening and filtering all rows,
442    /// we filter only at the deepest level and update parent multiplicities.
443    ///
444    /// # Arguments
445    ///
446    /// * `column_idx` - Column index within the deepest level to filter on
447    /// * `predicate` - Function that returns true for values to keep
448    ///
449    /// # Returns
450    ///
451    /// A new FactorizedChunk with filtered values, or None if all rows are filtered out.
452    ///
453    /// # Panics
454    ///
455    /// Panics if `column_idx` refers to a non-existent column in the deepest level.
456    #[must_use]
457    pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
458    where
459        F: Fn(&grafeo_common::types::Value) -> bool,
460    {
461        if self.levels.is_empty() {
462            return None;
463        }
464
465        let deepest_idx = self.levels.len() - 1;
466        let deepest = &self.levels[deepest_idx];
467
468        // Get the column to filter on
469        let filter_col = deepest.column(column_idx)?;
470
471        // Build filtered columns for the deepest level
472        let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
473            .map(|i| {
474                ValueVector::with_type(
475                    deepest
476                        .column(i)
477                        .expect("column exists: i < column_count")
478                        .data_type(),
479                )
480            })
481            .collect();
482
483        // Track new multiplicities for each parent
484        let parent_count = filter_col.parent_count();
485        let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
486        let mut new_offsets: Vec<u32> = vec![0];
487
488        // Filter each parent's children
489        for parent_idx in 0..parent_count {
490            let (start, end) = filter_col.range_for_parent(parent_idx);
491
492            for phys_idx in start..end {
493                // Check if this value passes the filter
494                if let Some(value) = filter_col.get_physical(phys_idx)
495                    && predicate(&value)
496                {
497                    // Copy all columns for this row
498                    for col_idx in 0..deepest.column_count() {
499                        if let Some(col) = deepest.column(col_idx)
500                            && let Some(v) = col.get_physical(phys_idx)
501                        {
502                            new_columns[col_idx].push_value(v);
503                        }
504                    }
505                    new_multiplicities[parent_idx] += 1;
506                }
507            }
508
509            // reason: column lengths in factorized chunks are bounded by chunk size, fit u32
510            #[allow(clippy::cast_possible_truncation)]
511            new_offsets.push(new_columns[0].len() as u32);
512        }
513
514        // Check if we have any rows left
515        let total_remaining: usize = new_multiplicities.iter().sum();
516        if total_remaining == 0 {
517            return Some(Self::empty());
518        }
519
520        // Build the new factorized vectors
521        let new_factorized_cols: Vec<FactorizedVector> = new_columns
522            .into_iter()
523            .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
524            .collect();
525
526        let new_level = FactorizationLevel::unflat(
527            new_factorized_cols,
528            deepest.column_names().to_vec(),
529            new_multiplicities,
530        );
531
532        // Build the result chunk
533        let mut result = Self {
534            levels: self.levels[..deepest_idx].to_vec(),
535            logical_row_count: 0,
536            state: ChunkState::flat(0),
537        };
538        result.levels.push(new_level);
539        result.recompute_logical_row_count();
540        result.update_state();
541
542        Some(result)
543    }
544
545    /// Filters the deepest level using a multi-column predicate.
546    ///
547    /// This allows filtering based on values from multiple columns in the deepest level.
548    ///
549    /// # Arguments
550    ///
551    /// * `predicate` - Function that takes a slice of values (one per column) and returns true to keep
552    ///
553    /// # Panics
554    ///
555    /// Panics if the deepest level's internal column storage is inconsistent
556    /// (column count reports more columns than actually exist).
557    #[must_use]
558    pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
559    where
560        F: Fn(&[grafeo_common::types::Value]) -> bool,
561    {
562        if self.levels.is_empty() {
563            return None;
564        }
565
566        let deepest_idx = self.levels.len() - 1;
567        let deepest = &self.levels[deepest_idx];
568        let col_count = deepest.column_count();
569
570        if col_count == 0 {
571            return None;
572        }
573
574        let first_col = deepest.column(0)?;
575        let parent_count = first_col.parent_count();
576
577        // Build filtered columns
578        let mut new_columns: Vec<ValueVector> = (0..col_count)
579            .map(|i| {
580                ValueVector::with_type(
581                    deepest
582                        .column(i)
583                        .expect("column exists: i < column_count")
584                        .data_type(),
585                )
586            })
587            .collect();
588
589        let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
590        let mut new_offsets: Vec<u32> = vec![0];
591        let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
592
593        for parent_idx in 0..parent_count {
594            let (start, end) = first_col.range_for_parent(parent_idx);
595
596            for phys_idx in start..end {
597                // Collect values from all columns
598                row_values.clear();
599                for col_idx in 0..col_count {
600                    if let Some(col) = deepest.column(col_idx)
601                        && let Some(v) = col.get_physical(phys_idx)
602                    {
603                        row_values.push(v);
604                    }
605                }
606
607                // Apply predicate
608                if predicate(&row_values) {
609                    for (col_idx, v) in row_values.iter().enumerate() {
610                        new_columns[col_idx].push_value(v.clone());
611                    }
612                    new_multiplicities[parent_idx] += 1;
613                }
614            }
615
616            // reason: column lengths in factorized chunks are bounded by chunk size, fit u32
617            #[allow(clippy::cast_possible_truncation)]
618            new_offsets.push(new_columns[0].len() as u32);
619        }
620
621        // Check if any rows remain
622        let total: usize = new_multiplicities.iter().sum();
623        if total == 0 {
624            return Some(Self::empty());
625        }
626
627        // Build new level
628        let new_factorized_cols: Vec<FactorizedVector> = new_columns
629            .into_iter()
630            .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
631            .collect();
632
633        let new_level = FactorizationLevel::unflat(
634            new_factorized_cols,
635            deepest.column_names().to_vec(),
636            new_multiplicities,
637        );
638
639        let mut result = Self {
640            levels: self.levels[..deepest_idx].to_vec(),
641            logical_row_count: 0,
642            state: ChunkState::flat(0),
643        };
644        result.levels.push(new_level);
645        result.recompute_logical_row_count();
646        result.update_state();
647
648        Some(result)
649    }
650
651    // ========================================================================
652    // Factorized Aggregation Methods
653    // ========================================================================
654
655    /// Computes COUNT(*) without flattening - returns the logical row count.
656    ///
657    /// This is O(n) where n is the number of physical values, instead of
658    /// O(m) where m is the number of logical rows (which can be exponentially larger).
659    ///
660    /// # Example
661    ///
662    /// For a 3-level chunk:
663    /// - Level 0: 100 sources
664    /// - Level 1: 10 neighbors each = 1,000 physical
665    /// - Level 2: 10 neighbors each = 10,000 physical
666    /// - Logical rows = 100 * 10 * 10 = 10,000
667    ///
668    /// `count_rows()` returns 10,000 by computing from multiplicities, not by
669    /// iterating through all logical rows.
670    #[must_use]
671    pub fn count_rows(&self) -> usize {
672        self.logical_row_count()
673    }
674
675    /// Computes the effective multiplicity for each value at the deepest level.
676    ///
677    /// This is how many times each value would appear in the flattened result.
678    /// For example, if a source has 3 first-hop neighbors and each has 2 second-hop
679    /// neighbors, each first-hop value has multiplicity 2 (appearing in 2 paths).
680    ///
681    /// # Returns
682    ///
683    /// A vector where `result[i]` is the multiplicity of physical value `i` at the
684    /// deepest level. The sum of all multiplicities equals `logical_row_count()`.
685    ///
686    /// # Note
687    ///
688    /// For repeated access (e.g., computing multiple aggregates), prefer using
689    /// [`path_multiplicities_cached`](Self::path_multiplicities_cached) which
690    /// caches the result and avoids O(levels) recomputation.
691    #[must_use]
692    pub fn compute_path_multiplicities(&self) -> Vec<usize> {
693        if self.levels.is_empty() {
694            return Vec::new();
695        }
696
697        // For a single level, each value has multiplicity 1
698        if self.levels.len() == 1 {
699            return vec![1; self.levels[0].group_count];
700        }
701
702        // Start with multiplicity 1 for each value at level 0
703        let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
704
705        // Propagate multiplicities through each level
706        for level_idx in 1..self.levels.len() {
707            let level = &self.levels[level_idx];
708            let mut child_multiplicities = Vec::with_capacity(level.group_count);
709
710            // For each parent, its children inherit its multiplicity
711            for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
712                let child_count = if parent_idx < level.multiplicities.len() {
713                    level.multiplicities[parent_idx]
714                } else {
715                    0
716                };
717
718                // Each child of this parent inherits the parent's multiplicity
719                for _ in 0..child_count {
720                    child_multiplicities.push(parent_mult);
721                }
722            }
723
724            parent_multiplicities = child_multiplicities;
725        }
726
727        parent_multiplicities
728    }
729
730    /// Computes SUM on a numeric column at the deepest level without flattening.
731    ///
732    /// Each value is multiplied by its effective multiplicity (how many times
733    /// it would appear in the flattened result).
734    ///
735    /// # Arguments
736    ///
737    /// * `column_idx` - Column index within the deepest level
738    ///
739    /// # Returns
740    ///
741    /// The sum as f64, or None if the column doesn't exist or contains non-numeric values.
742    #[must_use]
743    pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
744        if self.levels.is_empty() {
745            return None;
746        }
747
748        let deepest_idx = self.levels.len() - 1;
749        let deepest = &self.levels[deepest_idx];
750        let col = deepest.column(column_idx)?;
751
752        // Compute multiplicity for each physical value
753        let multiplicities = self.compute_path_multiplicities();
754
755        let mut sum = 0.0;
756        for (phys_idx, mult) in multiplicities.iter().enumerate() {
757            if let Some(value) = col.get_physical(phys_idx) {
758                // Try to convert to f64
759                let num_value = match &value {
760                    grafeo_common::types::Value::Int64(v) => *v as f64,
761                    grafeo_common::types::Value::Float64(v) => *v,
762                    _ => continue, // Skip non-numeric values
763                };
764                sum += num_value * (*mult as f64);
765            }
766        }
767        Some(sum)
768    }
769
770    /// Computes AVG on a numeric column at the deepest level without flattening.
771    ///
772    /// This is equivalent to `sum_deepest() / count_rows()`.
773    ///
774    /// # Arguments
775    ///
776    /// * `column_idx` - Column index within the deepest level
777    ///
778    /// # Returns
779    ///
780    /// The average as f64, or None if the column doesn't exist or the chunk is empty.
781    #[must_use]
782    pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
783        let count = self.logical_row_count();
784        if count == 0 {
785            return None;
786        }
787
788        let sum = self.sum_deepest(column_idx)?;
789        Some(sum / count as f64)
790    }
791
792    /// Computes MIN on a column at the deepest level without flattening.
793    ///
794    /// Unlike SUM/AVG, MIN doesn't need multiplicities - we just find the minimum
795    /// among all physical values.
796    ///
797    /// # Arguments
798    ///
799    /// * `column_idx` - Column index within the deepest level
800    ///
801    /// # Returns
802    ///
803    /// The minimum value, or None if the column doesn't exist or is empty.
804    #[must_use]
805    pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
806        if self.levels.is_empty() {
807            return None;
808        }
809
810        let deepest_idx = self.levels.len() - 1;
811        let deepest = &self.levels[deepest_idx];
812        let col = deepest.column(column_idx)?;
813
814        let mut min_value: Option<grafeo_common::types::Value> = None;
815
816        for phys_idx in 0..col.physical_len() {
817            if let Some(value) = col.get_physical(phys_idx) {
818                min_value = Some(match min_value {
819                    None => value,
820                    Some(current) => {
821                        if Self::value_less_than(&value, &current) {
822                            value
823                        } else {
824                            current
825                        }
826                    }
827                });
828            }
829        }
830
831        min_value
832    }
833
834    /// Computes MAX on a column at the deepest level without flattening.
835    ///
836    /// Unlike SUM/AVG, MAX doesn't need multiplicities - we just find the maximum
837    /// among all physical values.
838    ///
839    /// # Arguments
840    ///
841    /// * `column_idx` - Column index within the deepest level
842    ///
843    /// # Returns
844    ///
845    /// The maximum value, or None if the column doesn't exist or is empty.
846    #[must_use]
847    pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
848        if self.levels.is_empty() {
849            return None;
850        }
851
852        let deepest_idx = self.levels.len() - 1;
853        let deepest = &self.levels[deepest_idx];
854        let col = deepest.column(column_idx)?;
855
856        let mut max_value: Option<grafeo_common::types::Value> = None;
857
858        for phys_idx in 0..col.physical_len() {
859            if let Some(value) = col.get_physical(phys_idx) {
860                max_value = Some(match max_value {
861                    None => value,
862                    Some(current) => {
863                        if Self::value_less_than(&current, &value) {
864                            value
865                        } else {
866                            current
867                        }
868                    }
869                });
870            }
871        }
872
873        max_value
874    }
875
876    /// Compares two Values for ordering (a < b).
877    ///
878    /// Comparison rules:
879    /// - Null is always less than non-null
880    /// - Numeric types are compared by value
881    /// - Strings are compared lexicographically
882    /// - Other types use debug string comparison as fallback
883    fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
884        use grafeo_common::types::Value;
885
886        match (a, b) {
887            // Null handling
888            (Value::Null, Value::Null) => false,
889            (Value::Null, _) => true,
890            (_, Value::Null) => false,
891
892            // Numeric comparisons
893            (Value::Int64(x), Value::Int64(y)) => x < y,
894            (Value::Float64(x), Value::Float64(y)) => x < y,
895            (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
896            (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
897
898            // String comparison
899            (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
900
901            // Bool comparison (false < true)
902            (Value::Bool(x), Value::Bool(y)) => !x && *y,
903
904            // Fallback for incompatible types - not comparable
905            // Return false to keep the current value (arbitrary but consistent)
906            _ => false,
907        }
908    }
909
910    // ========================================================================
911    // Projection and Column Operations
912    // ========================================================================
913
914    /// Projects specific columns from the factorized chunk without flattening.
915    ///
916    /// # Arguments
917    ///
918    /// * `column_specs` - List of (level_idx, column_idx, new_name) tuples
919    ///
920    /// # Returns
921    ///
922    /// A new FactorizedChunk with only the specified columns.
923    #[must_use]
924    pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
925        if self.levels.is_empty() || column_specs.is_empty() {
926            return Self::empty();
927        }
928
929        // Group specs by level
930        let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
931        for (level_idx, col_idx, name) in column_specs {
932            if *level_idx < self.levels.len() {
933                level_specs[*level_idx].push((*col_idx, name.clone()));
934            }
935        }
936
937        // Build new levels with projected columns
938        let mut new_levels = Vec::new();
939
940        for (level_idx, specs) in level_specs.iter().enumerate() {
941            if specs.is_empty() {
942                continue;
943            }
944
945            let src_level = &self.levels[level_idx];
946
947            let columns: Vec<FactorizedVector> = specs
948                .iter()
949                .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
950                .collect();
951
952            let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
953
954            if level_idx == 0 {
955                new_levels.push(FactorizationLevel::flat(columns, names));
956            } else {
957                let mults = src_level.multiplicities().to_vec();
958                new_levels.push(FactorizationLevel::unflat(columns, names, mults));
959            }
960        }
961
962        if new_levels.is_empty() {
963            return Self::empty();
964        }
965
966        let mut result = Self {
967            levels: new_levels,
968            logical_row_count: 0,
969            state: ChunkState::flat(0),
970        };
971        result.recompute_logical_row_count();
972        result.update_state();
973        result
974    }
975}
976
977/// Iterator over logical rows in a factorized chunk.
978///
979/// Instead of materializing all rows, this iterator yields index tuples
980/// that can be used to access values at each level.
981///
982/// # Alternatives
983///
984/// For better performance, consider using the iterators from [`factorized_iter`](super::factorized_iter):
985///
986/// - [`PrecomputedIter`](super::factorized_iter::PrecomputedIter) - Pre-computes all indices
987///   for O(1) random access and better cache locality
988/// - [`StreamingIter`](super::factorized_iter::StreamingIter) - More memory-efficient
989///   streaming iteration with SmallVec stack allocation
990/// - [`RowView`](super::factorized_iter::RowView) - Zero-copy access to row values
991pub struct FactorizedRowIterator<'a> {
992    chunk: &'a FactorizedChunk,
993    /// Current physical indices at each level.
994    indices: Vec<usize>,
995    /// Maximum physical index at each level (per parent).
996    /// This is updated as we traverse.
997    exhausted: bool,
998}
999
1000impl<'a> FactorizedRowIterator<'a> {
1001    fn new(chunk: &'a FactorizedChunk) -> Self {
1002        let indices = vec![0; chunk.level_count()];
1003        let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
1004
1005        let mut iter = Self {
1006            chunk,
1007            indices,
1008            exhausted,
1009        };
1010
1011        // If initial position is invalid (e.g., first parent has 0 children), advance to valid position
1012        if !exhausted && !iter.has_valid_deepest_range() {
1013            if !iter.advance() {
1014                exhausted = true;
1015            }
1016            iter.exhausted = exhausted;
1017        }
1018
1019        iter
1020    }
1021
1022    /// Advances the indices like a mixed-radix counter.
1023    fn advance(&mut self) -> bool {
1024        if self.exhausted || self.chunk.levels.is_empty() {
1025            return false;
1026        }
1027
1028        // Start from the deepest level and work backwards
1029        for level_idx in (0..self.chunk.levels.len()).rev() {
1030            let level = &self.chunk.levels[level_idx];
1031
1032            // Get the parent index for this level
1033            let parent_idx = if level_idx == 0 {
1034                // Level 0 has no parent - just check bounds
1035                self.indices[0] + 1
1036            } else {
1037                // Get current parent's physical index
1038                self.indices[level_idx - 1]
1039            };
1040
1041            // Get the range of valid indices for this parent
1042            let (_start, end) = if level_idx == 0 {
1043                (0, level.group_count)
1044            } else {
1045                // For unflat levels, get range from parent
1046                if let Some(col) = level.columns.first() {
1047                    col.range_for_parent(parent_idx)
1048                } else {
1049                    (0, 0)
1050                }
1051            };
1052
1053            let current = self.indices[level_idx];
1054            if current + 1 < end {
1055                // Can advance at this level
1056                self.indices[level_idx] = current + 1;
1057                // Reset all deeper levels to their start positions
1058                for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1059                    if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1060                        let (deeper_start, _) =
1061                            deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1062                        self.indices[deeper_idx] = deeper_start;
1063                    }
1064                }
1065
1066                // Check if the deepest level has valid range - if any parent has 0 children,
1067                // we need to keep advancing instead of returning this invalid row
1068                if self.has_valid_deepest_range() {
1069                    return true;
1070                }
1071                // Otherwise, recursively try to advance again from the new position
1072                // This handles sparse data where many parents have 0 children
1073                return self.advance();
1074            }
1075            // Can't advance at this level - try parent level
1076        }
1077
1078        // Couldn't advance at any level - exhausted
1079        self.exhausted = true;
1080        false
1081    }
1082
1083    /// Checks if all levels have valid (non-empty) ranges for their current parent.
1084    ///
1085    /// This must check ALL levels, not just the deepest, because when an
1086    /// intermediate level has an empty range, deeper levels get reset to
1087    /// out-of-bounds indices that can alias into unrelated valid ranges.
1088    fn has_valid_deepest_range(&self) -> bool {
1089        if self.chunk.levels.len() <= 1 {
1090            return true; // Single level or empty - always valid
1091        }
1092
1093        // Check every unflat level (1..len) has a non-empty range for its parent
1094        for level_idx in 1..self.chunk.levels.len() {
1095            let parent_idx = self.indices[level_idx - 1];
1096            if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1097                let (start, end) = col.range_for_parent(parent_idx);
1098                if start >= end {
1099                    return false;
1100                }
1101            } else {
1102                return false;
1103            }
1104        }
1105
1106        true
1107    }
1108}
1109
1110impl Iterator for FactorizedRowIterator<'_> {
1111    type Item = Vec<usize>;
1112
1113    fn next(&mut self) -> Option<Self::Item> {
1114        if self.exhausted {
1115            return None;
1116        }
1117
1118        // Return current indices, then advance
1119        let result = self.indices.clone();
1120        self.advance();
1121        Some(result)
1122    }
1123}
1124
1125/// A chunk that can be either flat (DataChunk) or factorized (FactorizedChunk).
1126#[derive(Debug, Clone)]
1127#[non_exhaustive]
1128pub enum ChunkVariant {
1129    /// A flat chunk with all rows materialized.
1130    Flat(DataChunk),
1131    /// A factorized chunk with multi-level representation.
1132    Factorized(FactorizedChunk),
1133}
1134
1135impl ChunkVariant {
1136    /// Creates a flat variant from a DataChunk.
1137    #[must_use]
1138    pub fn flat(chunk: DataChunk) -> Self {
1139        Self::Flat(chunk)
1140    }
1141
1142    /// Creates a factorized variant from a FactorizedChunk.
1143    #[must_use]
1144    pub fn factorized(chunk: FactorizedChunk) -> Self {
1145        Self::Factorized(chunk)
1146    }
1147
1148    /// Ensures the chunk is flat, flattening if necessary.
1149    #[must_use]
1150    pub fn ensure_flat(self) -> DataChunk {
1151        match self {
1152            Self::Flat(chunk) => chunk,
1153            Self::Factorized(chunk) => chunk.flatten(),
1154        }
1155    }
1156
1157    /// Returns the logical row count.
1158    #[must_use]
1159    pub fn logical_row_count(&self) -> usize {
1160        match self {
1161            Self::Flat(chunk) => chunk.row_count(),
1162            Self::Factorized(chunk) => chunk.logical_row_count(),
1163        }
1164    }
1165
1166    /// Returns true if this is a factorized chunk.
1167    #[must_use]
1168    pub fn is_factorized(&self) -> bool {
1169        matches!(self, Self::Factorized(_))
1170    }
1171
1172    /// Returns true if this is a flat chunk.
1173    #[must_use]
1174    pub fn is_flat(&self) -> bool {
1175        matches!(self, Self::Flat(_))
1176    }
1177
1178    /// Returns true if the chunk is empty.
1179    #[must_use]
1180    pub fn is_empty(&self) -> bool {
1181        self.logical_row_count() == 0
1182    }
1183}
1184
1185impl From<DataChunk> for ChunkVariant {
1186    fn from(chunk: DataChunk) -> Self {
1187        Self::Flat(chunk)
1188    }
1189}
1190
1191impl From<FactorizedChunk> for ChunkVariant {
1192    fn from(chunk: FactorizedChunk) -> Self {
1193        Self::Factorized(chunk)
1194    }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199    use grafeo_common::types::{LogicalType, NodeId, Value};
1200
1201    use super::*;
1202
1203    fn make_flat_chunk() -> DataChunk {
1204        let mut col = ValueVector::with_type(LogicalType::Int64);
1205        col.push_int64(1);
1206        col.push_int64(2);
1207        DataChunk::new(vec![col])
1208    }
1209
1210    fn create_multi_level_chunk() -> FactorizedChunk {
1211        // 2 sources, each with 2 neighbors = 4 logical rows
1212        let mut sources = ValueVector::with_type(LogicalType::Int64);
1213        sources.push_int64(10);
1214        sources.push_int64(20);
1215
1216        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1217
1218        let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1219        neighbors.push_int64(1);
1220        neighbors.push_int64(2);
1221        neighbors.push_int64(3);
1222        neighbors.push_int64(4);
1223
1224        let offsets = vec![0, 2, 4];
1225        chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1226        chunk
1227    }
1228
1229    #[test]
1230    fn test_from_flat() {
1231        let flat = make_flat_chunk();
1232        let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1233
1234        assert_eq!(factorized.level_count(), 1);
1235        assert_eq!(factorized.logical_row_count(), 2);
1236        assert_eq!(factorized.physical_size(), 2);
1237    }
1238
1239    #[test]
1240    fn test_add_level() {
1241        // Start with 2 source nodes
1242        let mut col0 = ValueVector::with_type(LogicalType::Node);
1243        col0.push_node_id(NodeId::new(100));
1244        col0.push_node_id(NodeId::new(200));
1245
1246        let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1247
1248        assert_eq!(chunk.level_count(), 1);
1249        assert_eq!(chunk.logical_row_count(), 2);
1250
1251        // Add level 1: source 0 has 3 neighbors, source 1 has 2 neighbors
1252        let mut neighbors = ValueVector::with_type(LogicalType::Node);
1253        neighbors.push_node_id(NodeId::new(10));
1254        neighbors.push_node_id(NodeId::new(11));
1255        neighbors.push_node_id(NodeId::new(12));
1256        neighbors.push_node_id(NodeId::new(20));
1257        neighbors.push_node_id(NodeId::new(21));
1258
1259        let offsets = vec![0, 3, 5]; // source 0: 0..3, source 1: 3..5
1260        chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1261
1262        assert_eq!(chunk.level_count(), 2);
1263        assert_eq!(chunk.logical_row_count(), 5); // 3 + 2 neighbors
1264        assert_eq!(chunk.physical_size(), 2 + 5); // 2 sources + 5 neighbors
1265    }
1266
1267    #[test]
1268    fn test_flatten_single_level() {
1269        let flat = make_flat_chunk();
1270        let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1271
1272        let flattened = factorized.flatten();
1273        assert_eq!(flattened.row_count(), 2);
1274        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1275        assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1276    }
1277
1278    #[test]
1279    fn test_flatten_multi_level() {
1280        // 2 sources, each with 2 neighbors = 4 logical rows
1281        let mut sources = ValueVector::with_type(LogicalType::Int64);
1282        sources.push_int64(1);
1283        sources.push_int64(2);
1284
1285        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1286
1287        let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1288        neighbors.push_int64(10);
1289        neighbors.push_int64(11);
1290        neighbors.push_int64(20);
1291        neighbors.push_int64(21);
1292
1293        let offsets = vec![0, 2, 4];
1294        chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1295
1296        let flat = chunk.flatten();
1297        assert_eq!(flat.row_count(), 4);
1298        assert_eq!(flat.column_count(), 2);
1299
1300        // Check that sources are duplicated correctly
1301        // Row 0: (1, 10), Row 1: (1, 11), Row 2: (2, 20), Row 3: (2, 21)
1302        assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1303        assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1304        assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1305        assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1306        assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1307        assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1308        assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1309        assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1310    }
1311
1312    #[test]
1313    fn test_logical_row_iter_single_level() {
1314        let flat = make_flat_chunk();
1315        let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1316
1317        let indices: Vec<_> = factorized.logical_row_iter().collect();
1318        assert_eq!(indices.len(), 2);
1319        assert_eq!(indices[0], vec![0]);
1320        assert_eq!(indices[1], vec![1]);
1321    }
1322
1323    #[test]
1324    fn test_chunk_variant() {
1325        let flat = make_flat_chunk();
1326        let variant = ChunkVariant::flat(flat.clone());
1327
1328        assert!(variant.is_flat());
1329        assert!(!variant.is_factorized());
1330        assert_eq!(variant.logical_row_count(), 2);
1331
1332        let ensured = variant.ensure_flat();
1333        assert_eq!(ensured.row_count(), 2);
1334    }
1335
1336    #[test]
1337    fn test_chunk_variant_factorized() {
1338        let chunk = create_multi_level_chunk();
1339        let variant = ChunkVariant::factorized(chunk);
1340
1341        assert!(variant.is_factorized());
1342        assert!(!variant.is_flat());
1343        assert_eq!(variant.logical_row_count(), 4);
1344
1345        let flat = variant.ensure_flat();
1346        assert_eq!(flat.row_count(), 4);
1347    }
1348
1349    #[test]
1350    fn test_chunk_variant_from() {
1351        let flat = make_flat_chunk();
1352        let variant: ChunkVariant = flat.into();
1353        assert!(variant.is_flat());
1354
1355        let factorized = create_multi_level_chunk();
1356        let variant2: ChunkVariant = factorized.into();
1357        assert!(variant2.is_factorized());
1358    }
1359
1360    #[test]
1361    fn test_chunk_variant_is_empty() {
1362        let empty_flat = DataChunk::empty();
1363        let variant = ChunkVariant::flat(empty_flat);
1364        assert!(variant.is_empty());
1365
1366        let non_empty = make_flat_chunk();
1367        let variant2 = ChunkVariant::flat(non_empty);
1368        assert!(!variant2.is_empty());
1369    }
1370
1371    #[test]
1372    fn test_empty_chunk() {
1373        let chunk = FactorizedChunk::empty();
1374        assert_eq!(chunk.level_count(), 0);
1375        assert_eq!(chunk.logical_row_count(), 0);
1376        assert_eq!(chunk.physical_size(), 0);
1377
1378        let flat = chunk.flatten();
1379        assert!(flat.is_empty());
1380    }
1381
1382    #[test]
1383    fn test_all_column_names() {
1384        let mut sources = ValueVector::with_type(LogicalType::Int64);
1385        sources.push_int64(1);
1386
1387        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1388
1389        let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1390        neighbors.push_int64(10);
1391
1392        chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1393
1394        let names = chunk.all_column_names();
1395        assert_eq!(names, vec!["source", "neighbor"]);
1396    }
1397
1398    #[test]
1399    fn test_level_mut() {
1400        let mut chunk = create_multi_level_chunk();
1401
1402        // Access level mutably
1403        let level = chunk.level_mut(0).unwrap();
1404        assert_eq!(level.column_count(), 1);
1405
1406        // Invalid level should return None
1407        assert!(chunk.level_mut(10).is_none());
1408    }
1409
1410    #[test]
1411    fn test_factorization_level_column_mut() {
1412        let mut chunk = create_multi_level_chunk();
1413
1414        let level = chunk.level_mut(0).unwrap();
1415        let col = level.column_mut(0);
1416        assert!(col.is_some());
1417
1418        // Invalid column should return None
1419        assert!(level.column_mut(10).is_none());
1420    }
1421
1422    #[test]
1423    fn test_factorization_level_physical_value_count() {
1424        let chunk = create_multi_level_chunk();
1425
1426        let level0 = chunk.level(0).unwrap();
1427        assert_eq!(level0.physical_value_count(), 2); // 2 sources
1428
1429        let level1 = chunk.level(1).unwrap();
1430        assert_eq!(level1.physical_value_count(), 4); // 4 neighbors
1431    }
1432
1433    #[test]
1434    fn test_count_rows() {
1435        let chunk = create_multi_level_chunk();
1436        assert_eq!(chunk.count_rows(), 4);
1437
1438        let empty = FactorizedChunk::empty();
1439        assert_eq!(empty.count_rows(), 0);
1440    }
1441
1442    #[test]
1443    fn test_compute_path_multiplicities() {
1444        let chunk = create_multi_level_chunk();
1445
1446        let mults = chunk.compute_path_multiplicities();
1447        // Each value at the deepest level has multiplicity 1 since each parent has 2 children
1448        assert_eq!(mults.len(), 4);
1449        assert!(mults.iter().all(|&m| m == 1));
1450    }
1451
1452    #[test]
1453    fn test_compute_path_multiplicities_single_level() {
1454        let mut col = ValueVector::with_type(LogicalType::Int64);
1455        col.push_int64(1);
1456        col.push_int64(2);
1457        col.push_int64(3);
1458
1459        let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1460        let mults = chunk.compute_path_multiplicities();
1461
1462        // Single level: each value has multiplicity 1
1463        assert_eq!(mults.len(), 3);
1464        assert!(mults.iter().all(|&m| m == 1));
1465    }
1466
1467    #[test]
1468    fn test_compute_path_multiplicities_empty() {
1469        let chunk = FactorizedChunk::empty();
1470        let mults = chunk.compute_path_multiplicities();
1471        assert!(mults.is_empty());
1472    }
1473
1474    #[test]
1475    fn test_path_multiplicities_cached() {
1476        let mut chunk = create_multi_level_chunk();
1477
1478        // First call computes and caches
1479        let mults1 = chunk.path_multiplicities_cached();
1480        assert_eq!(mults1.len(), 4);
1481
1482        // Second call should return cached value
1483        let mults2 = chunk.path_multiplicities_cached();
1484        assert_eq!(mults1.len(), mults2.len());
1485    }
1486
1487    #[test]
1488    fn test_sum_deepest() {
1489        let chunk = create_multi_level_chunk();
1490
1491        // Deepest level has values [1, 2, 3, 4]
1492        let sum = chunk.sum_deepest(0);
1493        assert_eq!(sum, Some(10.0)); // 1 + 2 + 3 + 4
1494    }
1495
1496    #[test]
1497    fn test_sum_deepest_empty() {
1498        let chunk = FactorizedChunk::empty();
1499        assert!(chunk.sum_deepest(0).is_none());
1500    }
1501
1502    #[test]
1503    fn test_sum_deepest_invalid_column() {
1504        let chunk = create_multi_level_chunk();
1505        assert!(chunk.sum_deepest(10).is_none());
1506    }
1507
1508    #[test]
1509    fn test_avg_deepest() {
1510        let chunk = create_multi_level_chunk();
1511
1512        // Deepest level has values [1, 2, 3, 4], avg = 2.5
1513        let avg = chunk.avg_deepest(0);
1514        assert_eq!(avg, Some(2.5));
1515    }
1516
1517    #[test]
1518    fn test_avg_deepest_empty() {
1519        let chunk = FactorizedChunk::empty();
1520        assert!(chunk.avg_deepest(0).is_none());
1521    }
1522
1523    #[test]
1524    fn test_min_deepest() {
1525        let chunk = create_multi_level_chunk();
1526
1527        let min = chunk.min_deepest(0);
1528        assert_eq!(min, Some(Value::Int64(1)));
1529    }
1530
1531    #[test]
1532    fn test_min_deepest_empty() {
1533        let chunk = FactorizedChunk::empty();
1534        assert!(chunk.min_deepest(0).is_none());
1535    }
1536
1537    #[test]
1538    fn test_min_deepest_invalid_column() {
1539        let chunk = create_multi_level_chunk();
1540        assert!(chunk.min_deepest(10).is_none());
1541    }
1542
1543    #[test]
1544    fn test_max_deepest() {
1545        let chunk = create_multi_level_chunk();
1546
1547        let max = chunk.max_deepest(0);
1548        assert_eq!(max, Some(Value::Int64(4)));
1549    }
1550
1551    #[test]
1552    fn test_max_deepest_empty() {
1553        let chunk = FactorizedChunk::empty();
1554        assert!(chunk.max_deepest(0).is_none());
1555    }
1556
1557    #[test]
1558    fn test_value_less_than() {
1559        // Null handling
1560        assert!(FactorizedChunk::value_less_than(
1561            &Value::Null,
1562            &Value::Int64(1)
1563        ));
1564        assert!(!FactorizedChunk::value_less_than(
1565            &Value::Int64(1),
1566            &Value::Null
1567        ));
1568        assert!(!FactorizedChunk::value_less_than(
1569            &Value::Null,
1570            &Value::Null
1571        ));
1572
1573        // Int64
1574        assert!(FactorizedChunk::value_less_than(
1575            &Value::Int64(1),
1576            &Value::Int64(2)
1577        ));
1578        assert!(!FactorizedChunk::value_less_than(
1579            &Value::Int64(2),
1580            &Value::Int64(1)
1581        ));
1582
1583        // Float64
1584        assert!(FactorizedChunk::value_less_than(
1585            &Value::Float64(1.5),
1586            &Value::Float64(2.5)
1587        ));
1588
1589        // Mixed Int/Float
1590        assert!(FactorizedChunk::value_less_than(
1591            &Value::Int64(1),
1592            &Value::Float64(1.5)
1593        ));
1594        assert!(FactorizedChunk::value_less_than(
1595            &Value::Float64(0.5),
1596            &Value::Int64(1)
1597        ));
1598
1599        // String
1600        assert!(FactorizedChunk::value_less_than(
1601            &Value::String("apple".into()),
1602            &Value::String("banana".into())
1603        ));
1604
1605        // Bool (false < true)
1606        assert!(FactorizedChunk::value_less_than(
1607            &Value::Bool(false),
1608            &Value::Bool(true)
1609        ));
1610        assert!(!FactorizedChunk::value_less_than(
1611            &Value::Bool(true),
1612            &Value::Bool(false)
1613        ));
1614
1615        // Incompatible types return false
1616        assert!(!FactorizedChunk::value_less_than(
1617            &Value::Int64(1),
1618            &Value::String("hello".into())
1619        ));
1620    }
1621
1622    #[test]
1623    fn test_filter_deepest() {
1624        let chunk = create_multi_level_chunk();
1625
1626        // Filter to keep only values > 2
1627        let filtered = chunk.filter_deepest(0, |v| {
1628            if let Value::Int64(n) = v {
1629                *n > 2
1630            } else {
1631                false
1632            }
1633        });
1634
1635        let filtered = filtered.unwrap();
1636        assert_eq!(filtered.logical_row_count(), 2); // Only 3 and 4 remain
1637    }
1638
1639    #[test]
1640    fn test_filter_deepest_empty() {
1641        let chunk = FactorizedChunk::empty();
1642        assert!(chunk.filter_deepest(0, |_| true).is_none());
1643    }
1644
1645    #[test]
1646    fn test_filter_deepest_all_filtered() {
1647        let chunk = create_multi_level_chunk();
1648
1649        // Filter everything out
1650        let filtered = chunk.filter_deepest(0, |_| false);
1651
1652        let filtered = filtered.unwrap();
1653        assert_eq!(filtered.logical_row_count(), 0);
1654    }
1655
1656    #[test]
1657    fn test_filter_deepest_invalid_column() {
1658        let chunk = create_multi_level_chunk();
1659        assert!(chunk.filter_deepest(10, |_| true).is_none());
1660    }
1661
1662    #[test]
1663    fn test_filter_deepest_multi() {
1664        // Create a chunk with 2 columns at the deepest level
1665        let mut sources = ValueVector::with_type(LogicalType::Int64);
1666        sources.push_int64(1);
1667
1668        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1669
1670        let mut col1 = ValueVector::with_type(LogicalType::Int64);
1671        col1.push_int64(10);
1672        col1.push_int64(20);
1673        col1.push_int64(30);
1674
1675        let mut col2 = ValueVector::with_type(LogicalType::Int64);
1676        col2.push_int64(1);
1677        col2.push_int64(2);
1678        col2.push_int64(3);
1679
1680        let offsets = vec![0, 3];
1681        chunk.add_level(
1682            vec![col1, col2],
1683            vec!["a".to_string(), "b".to_string()],
1684            &offsets,
1685        );
1686
1687        // Filter based on both columns
1688        let filtered = chunk.filter_deepest_multi(|values| {
1689            if values.len() == 2
1690                && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1691            {
1692                return *a + *b > 15;
1693            }
1694            false
1695        });
1696
1697        assert!(filtered.is_some());
1698        let filtered = filtered.unwrap();
1699        assert_eq!(filtered.logical_row_count(), 2); // (20,2) and (30,3) pass
1700    }
1701
1702    #[test]
1703    fn test_filter_deepest_multi_empty() {
1704        let chunk = FactorizedChunk::empty();
1705        assert!(chunk.filter_deepest_multi(|_| true).is_none());
1706    }
1707
1708    #[test]
1709    fn test_filter_deepest_multi_no_columns() {
1710        // Create a chunk with no columns at level 1
1711        let mut sources = ValueVector::with_type(LogicalType::Int64);
1712        sources.push_int64(1);
1713
1714        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1715
1716        // Add empty level (edge case)
1717        let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1718        chunk.add_factorized_level(empty_level);
1719
1720        assert!(chunk.filter_deepest_multi(|_| true).is_none());
1721    }
1722
1723    #[test]
1724    fn test_project() {
1725        let mut sources = ValueVector::with_type(LogicalType::Int64);
1726        sources.push_int64(1);
1727        sources.push_int64(2);
1728
1729        let mut col2 = ValueVector::with_type(LogicalType::String);
1730        col2.push_string("a");
1731        col2.push_string("b");
1732
1733        let chunk = FactorizedChunk::with_flat_level(
1734            vec![sources, col2],
1735            vec!["num".to_string(), "str".to_string()],
1736        );
1737
1738        // Project only the first column
1739        let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1740
1741        assert_eq!(projected.total_column_count(), 1);
1742        let names = projected.all_column_names();
1743        assert_eq!(names, vec!["projected_num"]);
1744    }
1745
1746    #[test]
1747    fn test_project_empty() {
1748        let chunk = FactorizedChunk::empty();
1749        let projected = chunk.project(&[(0, 0, "col".to_string())]);
1750        assert_eq!(projected.level_count(), 0);
1751    }
1752
1753    #[test]
1754    fn test_project_empty_specs() {
1755        let chunk = create_multi_level_chunk();
1756        let projected = chunk.project(&[]);
1757        assert_eq!(projected.level_count(), 0);
1758    }
1759
1760    #[test]
1761    fn test_project_invalid_level() {
1762        let chunk = create_multi_level_chunk();
1763
1764        // Project from invalid level
1765        let projected = chunk.project(&[(10, 0, "col".to_string())]);
1766        assert_eq!(projected.level_count(), 0);
1767    }
1768
1769    #[test]
1770    fn test_project_multi_level() {
1771        let chunk = create_multi_level_chunk();
1772
1773        // Project from both levels
1774        let projected =
1775            chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1776
1777        assert_eq!(projected.level_count(), 2);
1778        assert_eq!(projected.total_column_count(), 2);
1779    }
1780
1781    #[test]
1782    fn test_total_column_count() {
1783        let chunk = create_multi_level_chunk();
1784        assert_eq!(chunk.total_column_count(), 2); // 1 at level 0, 1 at level 1
1785    }
1786
1787    #[test]
1788    fn test_chunk_state_access() {
1789        let mut chunk = create_multi_level_chunk();
1790
1791        let state = chunk.chunk_state();
1792        assert!(state.is_factorized());
1793
1794        let state_mut = chunk.chunk_state_mut();
1795        state_mut.invalidate_cache();
1796    }
1797
1798    #[test]
1799    fn test_logical_row_iter_multi_level() {
1800        let chunk = create_multi_level_chunk();
1801
1802        let indices: Vec<_> = chunk.logical_row_iter().collect();
1803        assert_eq!(indices.len(), 4);
1804
1805        // Verify structure: [source_idx, neighbor_idx]
1806        assert_eq!(indices[0], vec![0, 0]);
1807        assert_eq!(indices[1], vec![0, 1]);
1808        assert_eq!(indices[2], vec![1, 2]);
1809        assert_eq!(indices[3], vec![1, 3]);
1810    }
1811
1812    #[test]
1813    fn test_sum_deepest_with_float() {
1814        let mut sources = ValueVector::with_type(LogicalType::Int64);
1815        sources.push_int64(1);
1816
1817        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1818
1819        let mut floats = ValueVector::with_type(LogicalType::Float64);
1820        floats.push_float64(1.5);
1821        floats.push_float64(2.5);
1822        floats.push_float64(3.0);
1823
1824        chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1825
1826        let sum = chunk.sum_deepest(0);
1827        assert_eq!(sum, Some(7.0)); // 1.5 + 2.5 + 3.0
1828    }
1829
1830    #[test]
1831    fn test_min_max_with_strings() {
1832        let mut sources = ValueVector::with_type(LogicalType::Int64);
1833        sources.push_int64(1);
1834
1835        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1836
1837        let mut strings = ValueVector::with_type(LogicalType::String);
1838        strings.push_string("banana");
1839        strings.push_string("apple");
1840        strings.push_string("cherry");
1841
1842        chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1843
1844        let min = chunk.min_deepest(0);
1845        assert_eq!(min, Some(Value::String("apple".into())));
1846
1847        let max = chunk.max_deepest(0);
1848        assert_eq!(max, Some(Value::String("cherry".into())));
1849    }
1850
1851    #[test]
1852    fn test_recompute_logical_row_count_empty() {
1853        let mut chunk = FactorizedChunk::empty();
1854        chunk.recompute_logical_row_count();
1855        assert_eq!(chunk.logical_row_count(), 0);
1856    }
1857
1858    #[test]
1859    fn test_factorization_level_group_count() {
1860        let chunk = create_multi_level_chunk();
1861
1862        let level0 = chunk.level(0).unwrap();
1863        assert_eq!(level0.group_count(), 2);
1864
1865        let level1 = chunk.level(1).unwrap();
1866        assert_eq!(level1.group_count(), 4);
1867    }
1868
1869    #[test]
1870    fn test_factorization_level_multiplicities() {
1871        let chunk = create_multi_level_chunk();
1872
1873        let level1 = chunk.level(1).unwrap();
1874        let mults = level1.multiplicities();
1875        assert_eq!(mults, &[2, 2]); // Each source has 2 neighbors
1876    }
1877
1878    #[test]
1879    fn test_factorization_level_column_names() {
1880        let chunk = create_multi_level_chunk();
1881
1882        let level0 = chunk.level(0).unwrap();
1883        assert_eq!(level0.column_names(), &["src"]);
1884
1885        let level1 = chunk.level(1).unwrap();
1886        assert_eq!(level1.column_names(), &["nbr"]);
1887    }
1888}