Skip to main content

grafeo_core/execution/
factorized_chunk.rs

1//! FactorizedChunk - multi-level factorized data representation.
2//!
3//! A `FactorizedChunk` organizes columns into levels, where each level can have
4//! different factorization (multiplicity). This avoids materializing the full
5//! Cartesian product during multi-hop graph traversals.
6//!
7//! # Example
8//!
9//! For a 2-hop query `MATCH (a)-[e1]->(b)-[e2]->(c)`:
10//!
11//! ```text
12//! Level 0 (flat):   [a1, a2]           (2 source nodes)
13//! Level 1 (unflat): [b1, b2, b3, b4]   (4 first-hop neighbors)
14//!                   offsets: [0, 2, 4]  (a1 has 2 neighbors, a2 has 2)
15//! Level 2 (unflat): [c1, c2, ..., c8]  (8 second-hop neighbors)
16//!                   offsets: [0, 2, 4, 6, 8]
17//!
18//! Logical rows = 2 * 2 * 2 = 8, but physical storage = 2 + 4 + 8 = 14 values
19//! vs flat storage = 8 * 5 columns = 40 values
20//! ```
21
22use std::sync::Arc;
23
24use super::chunk::DataChunk;
25use super::chunk_state::ChunkState;
26use super::factorized_vector::FactorizedVector;
27use super::vector::ValueVector;
28
29/// A chunk that supports factorized representation across multiple levels.
30///
31/// Columns are organized in groups by their factorization level:
32/// - Level 0 (flat): Base columns, one value per logical row
33/// - Level 1 (unflat): First expansion, grouped by level 0
34/// - Level 2 (unflat): Second expansion, grouped by level 1
35/// - And so on...
36///
37/// # State Management
38///
39/// The chunk maintains a [`ChunkState`] that provides:
40/// - Cached multiplicities for O(1) aggregate access
41/// - Selection vector support for lazy filtering
42/// - Generation tracking for cache invalidation
43#[derive(Debug, Clone)]
44pub struct FactorizedChunk {
45    /// Column groups organized by factorization level.
46    levels: Vec<FactorizationLevel>,
47    /// Total logical row count (product of all multiplicities).
48    logical_row_count: usize,
49    /// Unified state tracking (caching, selection, etc.).
50    state: ChunkState,
51}
52
53/// A factorization level containing columns at the same nesting depth.
54#[derive(Debug, Clone)]
55pub struct FactorizationLevel {
56    /// Columns at this level.
57    columns: Vec<FactorizedVector>,
58    /// Column names or identifiers (for schema mapping).
59    column_names: Vec<String>,
60    /// Number of groups at this level.
61    group_count: usize,
62    /// Multiplicities for each group (how many children per parent).
63    /// For level 0, this is vec![1; group_count].
64    /// For level N, multiplicities[i] = number of values for parent i.
65    multiplicities: Vec<usize>,
66}
67
68impl FactorizationLevel {
69    /// Creates a new flat level (level 0) from columns.
70    #[must_use]
71    pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
72        let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
73        let multiplicities = vec![1; group_count];
74        Self {
75            columns,
76            column_names,
77            group_count,
78            multiplicities,
79        }
80    }
81
82    /// Creates a new unflat level with the given multiplicities.
83    ///
84    /// Note: `multiplicities[i]` is the number of values for parent i.
85    /// The total number of values (group_count) is the sum of all multiplicities.
86    #[must_use]
87    pub fn unflat(
88        columns: Vec<FactorizedVector>,
89        column_names: Vec<String>,
90        multiplicities: Vec<usize>,
91    ) -> Self {
92        // group_count is the total number of values at this level (sum of multiplicities)
93        let group_count = multiplicities.iter().sum();
94        Self {
95            columns,
96            column_names,
97            group_count,
98            multiplicities,
99        }
100    }
101
102    /// Returns the number of columns at this level.
103    #[must_use]
104    pub fn column_count(&self) -> usize {
105        self.columns.len()
106    }
107
108    /// Returns the number of groups at this level.
109    #[must_use]
110    pub fn group_count(&self) -> usize {
111        self.group_count
112    }
113
114    /// Returns the total physical value count across all columns.
115    #[must_use]
116    pub fn physical_value_count(&self) -> usize {
117        self.columns
118            .iter()
119            .map(FactorizedVector::physical_len)
120            .sum()
121    }
122
123    /// Returns the multiplicities for this level.
124    #[must_use]
125    pub fn multiplicities(&self) -> &[usize] {
126        &self.multiplicities
127    }
128
129    /// Returns a column by index.
130    #[must_use]
131    pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
132        self.columns.get(index)
133    }
134
135    /// Returns a mutable column by index.
136    pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
137        self.columns.get_mut(index)
138    }
139
140    /// Returns the column names.
141    #[must_use]
142    pub fn column_names(&self) -> &[String] {
143        &self.column_names
144    }
145}
146
147impl FactorizedChunk {
148    /// Creates an empty factorized chunk.
149    #[must_use]
150    pub fn empty() -> Self {
151        Self {
152            levels: Vec::new(),
153            logical_row_count: 0,
154            state: ChunkState::flat(0),
155        }
156    }
157
158    /// Creates a factorized chunk from a flat `DataChunk`.
159    ///
160    /// The resulting chunk has a single level (level 0) with all columns flat.
161    #[must_use]
162    pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
163        let columns: Vec<FactorizedVector> = chunk
164            .columns()
165            .iter()
166            .map(|c| FactorizedVector::flat(c.clone()))
167            .collect();
168
169        let row_count = chunk.row_count();
170        let level = FactorizationLevel::flat(columns, column_names);
171
172        Self {
173            levels: vec![level],
174            logical_row_count: row_count,
175            state: ChunkState::unflat(1, row_count),
176        }
177    }
178
179    /// Creates a factorized chunk with a single flat level.
180    #[must_use]
181    pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
182        let row_count = columns.first().map_or(0, ValueVector::len);
183        let factorized_columns: Vec<FactorizedVector> =
184            columns.into_iter().map(FactorizedVector::flat).collect();
185
186        let level = FactorizationLevel::flat(factorized_columns, column_names);
187
188        Self {
189            levels: vec![level],
190            logical_row_count: row_count,
191            state: ChunkState::unflat(1, row_count),
192        }
193    }
194
195    /// Returns the number of factorization levels.
196    #[must_use]
197    pub fn level_count(&self) -> usize {
198        self.levels.len()
199    }
200
201    /// Returns the logical row count (full Cartesian product size).
202    #[must_use]
203    pub fn logical_row_count(&self) -> usize {
204        self.logical_row_count
205    }
206
207    /// Returns the physical storage size (actual values stored).
208    #[must_use]
209    pub fn physical_size(&self) -> usize {
210        self.levels
211            .iter()
212            .map(FactorizationLevel::physical_value_count)
213            .sum()
214    }
215
216    /// Returns the chunk state.
217    #[must_use]
218    pub fn chunk_state(&self) -> &ChunkState {
219        &self.state
220    }
221
222    /// Returns mutable access to the chunk state.
223    pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
224        &mut self.state
225    }
226
227    /// Returns path multiplicities, computing once and caching.
228    ///
229    /// This is the key optimization for aggregation: multiplicities are
230    /// computed once and reused for all aggregates (COUNT, SUM, AVG, etc.).
231    ///
232    /// # Example
233    ///
234    /// ```ignore
235    /// let mults = chunk.path_multiplicities_cached();
236    /// let sum = chunk.sum_deepest_with_multiplicities(0, &mults);
237    /// let avg = chunk.avg_deepest_with_multiplicities(0, &mults);
238    /// ```
239    pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
240        // Check if already cached
241        if let Some(cached) = self.state.cached_multiplicities() {
242            return Arc::clone(cached);
243        }
244
245        // Compute and cache
246        let mults = self.compute_path_multiplicities();
247        let arc_mults: Arc<[usize]> = mults.into();
248        self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
249        arc_mults
250    }
251
252    /// Returns a level by index.
253    #[must_use]
254    pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
255        self.levels.get(index)
256    }
257
258    /// Returns a mutable level by index.
259    pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
260        self.levels.get_mut(index)
261    }
262
263    /// Adds a new factorization level for expansion results.
264    ///
265    /// The new level's multiplicities determine how many values each parent
266    /// in the previous level expands to.
267    ///
268    /// # Arguments
269    ///
270    /// * `columns` - Columns at the new level
271    /// * `column_names` - Names for the new columns
272    /// * `offsets` - Offset array where `offsets[i]` is the start index for parent `i`
273    pub fn add_level(
274        &mut self,
275        columns: Vec<ValueVector>,
276        column_names: Vec<String>,
277        offsets: &[u32],
278    ) {
279        let parent_count = offsets.len().saturating_sub(1);
280
281        // Compute multiplicities from offsets
282        let multiplicities: Vec<usize> = (0..parent_count)
283            .map(|i| (offsets[i + 1] - offsets[i]) as usize)
284            .collect();
285
286        // Create unflat factorized vectors
287        let factorized_columns: Vec<FactorizedVector> = columns
288            .into_iter()
289            .map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
290            .collect();
291
292        let level =
293            FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
294        self.levels.push(level);
295
296        // Update logical row count
297        // New count = previous count * sum of new multiplicities / parent_count
298        // Actually: each parent's contribution is multiplied by its multiplicity
299        if self.levels.len() == 1 {
300            // First level - logical count is just the sum of multiplicities (or total values)
301            self.logical_row_count = multiplicities.iter().sum();
302        } else {
303            // For subsequent levels, we need to compute based on parent multiplicities
304            self.recompute_logical_row_count();
305        }
306
307        // Update state (invalidates cached multiplicities)
308        self.update_state();
309    }
310
311    /// Adds a level with pre-computed factorized vectors.
312    pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
313        self.levels.push(level);
314        self.recompute_logical_row_count();
315        self.update_state();
316    }
317
318    /// Updates the ChunkState to reflect current structure.
319    fn update_state(&mut self) {
320        self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
321    }
322
323    /// Recomputes the logical row count from all levels.
324    fn recompute_logical_row_count(&mut self) {
325        if self.levels.is_empty() {
326            self.logical_row_count = 0;
327            return;
328        }
329
330        // Start with level 0 count
331        let level0_count = self.levels[0].group_count;
332        if self.levels.len() == 1 {
333            self.logical_row_count = level0_count;
334            return;
335        }
336
337        // For multi-level: compute recursively
338        // Each parent at level N-1 contributes its multiplicity to level N
339        let mut counts = vec![1usize; level0_count];
340
341        for level_idx in 1..self.levels.len() {
342            let level = &self.levels[level_idx];
343            let mut new_counts = Vec::with_capacity(counts.len() * 2); // ~2x expansion
344
345            for (parent_idx, &parent_count) in counts.iter().enumerate() {
346                // This parent expands to level.multiplicities[parent_idx] children
347                if parent_idx < level.multiplicities.len() {
348                    let child_mult = level.multiplicities[parent_idx];
349                    for _ in 0..child_mult {
350                        new_counts.push(parent_count);
351                    }
352                }
353            }
354
355            counts = new_counts;
356        }
357
358        self.logical_row_count = counts.len();
359    }
360
361    /// Flattens to a regular `DataChunk` (materializes the Cartesian product).
362    ///
363    /// All levels are expanded into flat rows.
364    #[must_use]
365    pub fn flatten(&self) -> DataChunk {
366        if self.levels.is_empty() {
367            return DataChunk::empty();
368        }
369
370        // Collect all column types across all levels
371        let mut all_columns: Vec<ValueVector> = Vec::new();
372
373        // For a single level, just flatten each column
374        if self.levels.len() == 1 {
375            let level = &self.levels[0];
376            for col in &level.columns {
377                all_columns.push(col.flatten(None));
378            }
379            return DataChunk::new(all_columns);
380        }
381
382        // Multi-level: need to expand according to multiplicities
383        // Build column data by iterating through logical rows
384        let row_iter = self.logical_row_iter();
385        let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
386
387        // Pre-allocate output columns
388        let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
389        for level in &self.levels {
390            for col in &level.columns {
391                output_columns.push(ValueVector::with_capacity(
392                    col.data_type(),
393                    self.logical_row_count,
394                ));
395            }
396        }
397
398        // Iterate through all logical rows
399        for indices in row_iter {
400            let mut col_offset = 0;
401            for (level_idx, level) in self.levels.iter().enumerate() {
402                let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
403                for (col_idx, col) in level.columns.iter().enumerate() {
404                    if let Some(value) = col.get_physical(level_idx_value) {
405                        output_columns[col_offset + col_idx].push_value(value);
406                    }
407                }
408                col_offset += level.column_count();
409            }
410        }
411
412        DataChunk::new(output_columns)
413    }
414
415    /// Returns an iterator over logical rows without materializing.
416    ///
417    /// Each iteration yields a vector of physical indices, one per level.
418    pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
419        FactorizedRowIterator::new(self)
420    }
421
422    /// Gets the total number of columns across all levels.
423    #[must_use]
424    pub fn total_column_count(&self) -> usize {
425        self.levels.iter().map(|l| l.column_count()).sum()
426    }
427
428    /// Gets all column names in order across all levels.
429    #[must_use]
430    pub fn all_column_names(&self) -> Vec<String> {
431        self.levels
432            .iter()
433            .flat_map(|l| l.column_names.iter().cloned())
434            .collect()
435    }
436
437    /// Filters the deepest level in-place using a predicate on column values.
438    ///
439    /// This is the key optimization: instead of flattening and filtering all rows,
440    /// we filter only at the deepest level and update parent multiplicities.
441    ///
442    /// # Arguments
443    ///
444    /// * `column_idx` - Column index within the deepest level to filter on
445    /// * `predicate` - Function that returns true for values to keep
446    ///
447    /// # Returns
448    ///
449    /// A new FactorizedChunk with filtered values, or None if all rows are filtered out.
450    #[must_use]
451    pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
452    where
453        F: Fn(&grafeo_common::types::Value) -> bool,
454    {
455        if self.levels.is_empty() {
456            return None;
457        }
458
459        let deepest_idx = self.levels.len() - 1;
460        let deepest = &self.levels[deepest_idx];
461
462        // Get the column to filter on
463        let filter_col = deepest.column(column_idx)?;
464
465        // Build filtered columns for the deepest level
466        let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
467            .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
468            .collect();
469
470        // Track new multiplicities for each parent
471        let parent_count = filter_col.parent_count();
472        let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
473        let mut new_offsets: Vec<u32> = vec![0];
474
475        // Filter each parent's children
476        for parent_idx in 0..parent_count {
477            let (start, end) = filter_col.range_for_parent(parent_idx);
478
479            for phys_idx in start..end {
480                // Check if this value passes the filter
481                if let Some(value) = filter_col.get_physical(phys_idx)
482                    && predicate(&value)
483                {
484                    // Copy all columns for this row
485                    for col_idx in 0..deepest.column_count() {
486                        if let Some(col) = deepest.column(col_idx)
487                            && let Some(v) = col.get_physical(phys_idx)
488                        {
489                            new_columns[col_idx].push_value(v);
490                        }
491                    }
492                    new_multiplicities[parent_idx] += 1;
493                }
494            }
495
496            new_offsets.push(new_columns[0].len() as u32);
497        }
498
499        // Check if we have any rows left
500        let total_remaining: usize = new_multiplicities.iter().sum();
501        if total_remaining == 0 {
502            return Some(Self::empty());
503        }
504
505        // Build the new factorized vectors
506        let new_factorized_cols: Vec<FactorizedVector> = new_columns
507            .into_iter()
508            .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
509            .collect();
510
511        let new_level = FactorizationLevel::unflat(
512            new_factorized_cols,
513            deepest.column_names().to_vec(),
514            new_multiplicities,
515        );
516
517        // Build the result chunk
518        let mut result = Self {
519            levels: self.levels[..deepest_idx].to_vec(),
520            logical_row_count: 0,
521            state: ChunkState::flat(0),
522        };
523        result.levels.push(new_level);
524        result.recompute_logical_row_count();
525        result.update_state();
526
527        Some(result)
528    }
529
530    /// Filters the deepest level using a multi-column predicate.
531    ///
532    /// This allows filtering based on values from multiple columns in the deepest level.
533    ///
534    /// # Arguments
535    ///
536    /// * `predicate` - Function that takes a slice of values (one per column) and returns true to keep
537    #[must_use]
538    pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
539    where
540        F: Fn(&[grafeo_common::types::Value]) -> bool,
541    {
542        if self.levels.is_empty() {
543            return None;
544        }
545
546        let deepest_idx = self.levels.len() - 1;
547        let deepest = &self.levels[deepest_idx];
548        let col_count = deepest.column_count();
549
550        if col_count == 0 {
551            return None;
552        }
553
554        let first_col = deepest.column(0)?;
555        let parent_count = first_col.parent_count();
556
557        // Build filtered columns
558        let mut new_columns: Vec<ValueVector> = (0..col_count)
559            .map(|i| ValueVector::with_type(deepest.column(i).unwrap().data_type()))
560            .collect();
561
562        let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
563        let mut new_offsets: Vec<u32> = vec![0];
564        let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
565
566        for parent_idx in 0..parent_count {
567            let (start, end) = first_col.range_for_parent(parent_idx);
568
569            for phys_idx in start..end {
570                // Collect values from all columns
571                row_values.clear();
572                for col_idx in 0..col_count {
573                    if let Some(col) = deepest.column(col_idx)
574                        && let Some(v) = col.get_physical(phys_idx)
575                    {
576                        row_values.push(v);
577                    }
578                }
579
580                // Apply predicate
581                if predicate(&row_values) {
582                    for (col_idx, v) in row_values.iter().enumerate() {
583                        new_columns[col_idx].push_value(v.clone());
584                    }
585                    new_multiplicities[parent_idx] += 1;
586                }
587            }
588
589            new_offsets.push(new_columns[0].len() as u32);
590        }
591
592        // Check if any rows remain
593        let total: usize = new_multiplicities.iter().sum();
594        if total == 0 {
595            return Some(Self::empty());
596        }
597
598        // Build new level
599        let new_factorized_cols: Vec<FactorizedVector> = new_columns
600            .into_iter()
601            .map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
602            .collect();
603
604        let new_level = FactorizationLevel::unflat(
605            new_factorized_cols,
606            deepest.column_names().to_vec(),
607            new_multiplicities,
608        );
609
610        let mut result = Self {
611            levels: self.levels[..deepest_idx].to_vec(),
612            logical_row_count: 0,
613            state: ChunkState::flat(0),
614        };
615        result.levels.push(new_level);
616        result.recompute_logical_row_count();
617        result.update_state();
618
619        Some(result)
620    }
621
622    // ========================================================================
623    // Factorized Aggregation Methods
624    // ========================================================================
625
626    /// Computes COUNT(*) without flattening - returns the logical row count.
627    ///
628    /// This is O(n) where n is the number of physical values, instead of
629    /// O(m) where m is the number of logical rows (which can be exponentially larger).
630    ///
631    /// # Example
632    ///
633    /// For a 3-level chunk:
634    /// - Level 0: 100 sources
635    /// - Level 1: 10 neighbors each = 1,000 physical
636    /// - Level 2: 10 neighbors each = 10,000 physical
637    /// - Logical rows = 100 * 10 * 10 = 10,000
638    ///
639    /// `count_rows()` returns 10,000 by computing from multiplicities, not by
640    /// iterating through all logical rows.
641    #[must_use]
642    pub fn count_rows(&self) -> usize {
643        self.logical_row_count()
644    }
645
646    /// Computes the effective multiplicity for each value at the deepest level.
647    ///
648    /// This is how many times each value would appear in the flattened result.
649    /// For example, if a source has 3 first-hop neighbors and each has 2 second-hop
650    /// neighbors, each first-hop value has multiplicity 2 (appearing in 2 paths).
651    ///
652    /// # Returns
653    ///
654    /// A vector where `result[i]` is the multiplicity of physical value `i` at the
655    /// deepest level. The sum of all multiplicities equals `logical_row_count()`.
656    ///
657    /// # Note
658    ///
659    /// For repeated access (e.g., computing multiple aggregates), prefer using
660    /// [`path_multiplicities_cached`](Self::path_multiplicities_cached) which
661    /// caches the result and avoids O(levels) recomputation.
662    #[must_use]
663    pub fn compute_path_multiplicities(&self) -> Vec<usize> {
664        if self.levels.is_empty() {
665            return Vec::new();
666        }
667
668        // For a single level, each value has multiplicity 1
669        if self.levels.len() == 1 {
670            return vec![1; self.levels[0].group_count];
671        }
672
673        // Start with multiplicity 1 for each value at level 0
674        let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
675
676        // Propagate multiplicities through each level
677        for level_idx in 1..self.levels.len() {
678            let level = &self.levels[level_idx];
679            let mut child_multiplicities = Vec::with_capacity(level.group_count);
680
681            // For each parent, its children inherit its multiplicity
682            for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
683                let child_count = if parent_idx < level.multiplicities.len() {
684                    level.multiplicities[parent_idx]
685                } else {
686                    0
687                };
688
689                // Each child of this parent inherits the parent's multiplicity
690                for _ in 0..child_count {
691                    child_multiplicities.push(parent_mult);
692                }
693            }
694
695            parent_multiplicities = child_multiplicities;
696        }
697
698        parent_multiplicities
699    }
700
701    /// Computes SUM on a numeric column at the deepest level without flattening.
702    ///
703    /// Each value is multiplied by its effective multiplicity (how many times
704    /// it would appear in the flattened result).
705    ///
706    /// # Arguments
707    ///
708    /// * `column_idx` - Column index within the deepest level
709    ///
710    /// # Returns
711    ///
712    /// The sum as f64, or None if the column doesn't exist or contains non-numeric values.
713    #[must_use]
714    pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
715        if self.levels.is_empty() {
716            return None;
717        }
718
719        let deepest_idx = self.levels.len() - 1;
720        let deepest = &self.levels[deepest_idx];
721        let col = deepest.column(column_idx)?;
722
723        // Compute multiplicity for each physical value
724        let multiplicities = self.compute_path_multiplicities();
725
726        let mut sum = 0.0;
727        for (phys_idx, mult) in multiplicities.iter().enumerate() {
728            if let Some(value) = col.get_physical(phys_idx) {
729                // Try to convert to f64
730                let num_value = match &value {
731                    grafeo_common::types::Value::Int64(v) => *v as f64,
732                    grafeo_common::types::Value::Float64(v) => *v,
733                    _ => continue, // Skip non-numeric values
734                };
735                sum += num_value * (*mult as f64);
736            }
737        }
738        Some(sum)
739    }
740
741    /// Computes AVG on a numeric column at the deepest level without flattening.
742    ///
743    /// This is equivalent to `sum_deepest() / count_rows()`.
744    ///
745    /// # Arguments
746    ///
747    /// * `column_idx` - Column index within the deepest level
748    ///
749    /// # Returns
750    ///
751    /// The average as f64, or None if the column doesn't exist or the chunk is empty.
752    #[must_use]
753    pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
754        let count = self.logical_row_count();
755        if count == 0 {
756            return None;
757        }
758
759        let sum = self.sum_deepest(column_idx)?;
760        Some(sum / count as f64)
761    }
762
763    /// Computes MIN on a column at the deepest level without flattening.
764    ///
765    /// Unlike SUM/AVG, MIN doesn't need multiplicities - we just find the minimum
766    /// among all physical values.
767    ///
768    /// # Arguments
769    ///
770    /// * `column_idx` - Column index within the deepest level
771    ///
772    /// # Returns
773    ///
774    /// The minimum value, or None if the column doesn't exist or is empty.
775    #[must_use]
776    pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
777        if self.levels.is_empty() {
778            return None;
779        }
780
781        let deepest_idx = self.levels.len() - 1;
782        let deepest = &self.levels[deepest_idx];
783        let col = deepest.column(column_idx)?;
784
785        let mut min_value: Option<grafeo_common::types::Value> = None;
786
787        for phys_idx in 0..col.physical_len() {
788            if let Some(value) = col.get_physical(phys_idx) {
789                min_value = Some(match min_value {
790                    None => value,
791                    Some(current) => {
792                        if Self::value_less_than(&value, &current) {
793                            value
794                        } else {
795                            current
796                        }
797                    }
798                });
799            }
800        }
801
802        min_value
803    }
804
805    /// Computes MAX on a column at the deepest level without flattening.
806    ///
807    /// Unlike SUM/AVG, MAX doesn't need multiplicities - we just find the maximum
808    /// among all physical values.
809    ///
810    /// # Arguments
811    ///
812    /// * `column_idx` - Column index within the deepest level
813    ///
814    /// # Returns
815    ///
816    /// The maximum value, or None if the column doesn't exist or is empty.
817    #[must_use]
818    pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
819        if self.levels.is_empty() {
820            return None;
821        }
822
823        let deepest_idx = self.levels.len() - 1;
824        let deepest = &self.levels[deepest_idx];
825        let col = deepest.column(column_idx)?;
826
827        let mut max_value: Option<grafeo_common::types::Value> = None;
828
829        for phys_idx in 0..col.physical_len() {
830            if let Some(value) = col.get_physical(phys_idx) {
831                max_value = Some(match max_value {
832                    None => value,
833                    Some(current) => {
834                        if Self::value_less_than(&current, &value) {
835                            value
836                        } else {
837                            current
838                        }
839                    }
840                });
841            }
842        }
843
844        max_value
845    }
846
847    /// Compares two Values for ordering (a < b).
848    ///
849    /// Comparison rules:
850    /// - Null is always less than non-null
851    /// - Numeric types are compared by value
852    /// - Strings are compared lexicographically
853    /// - Other types use debug string comparison as fallback
854    fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
855        use grafeo_common::types::Value;
856
857        match (a, b) {
858            // Null handling
859            (Value::Null, Value::Null) => false,
860            (Value::Null, _) => true,
861            (_, Value::Null) => false,
862
863            // Numeric comparisons
864            (Value::Int64(x), Value::Int64(y)) => x < y,
865            (Value::Float64(x), Value::Float64(y)) => x < y,
866            (Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
867            (Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
868
869            // String comparison
870            (Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
871
872            // Bool comparison (false < true)
873            (Value::Bool(x), Value::Bool(y)) => !x && *y,
874
875            // Fallback for incompatible types - not comparable
876            // Return false to keep the current value (arbitrary but consistent)
877            _ => false,
878        }
879    }
880
881    // ========================================================================
882    // Projection and Column Operations
883    // ========================================================================
884
885    /// Projects specific columns from the factorized chunk without flattening.
886    ///
887    /// # Arguments
888    ///
889    /// * `column_specs` - List of (level_idx, column_idx, new_name) tuples
890    ///
891    /// # Returns
892    ///
893    /// A new FactorizedChunk with only the specified columns.
894    #[must_use]
895    pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
896        if self.levels.is_empty() || column_specs.is_empty() {
897            return Self::empty();
898        }
899
900        // Group specs by level
901        let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
902        for (level_idx, col_idx, name) in column_specs {
903            if *level_idx < self.levels.len() {
904                level_specs[*level_idx].push((*col_idx, name.clone()));
905            }
906        }
907
908        // Build new levels with projected columns
909        let mut new_levels = Vec::new();
910
911        for (level_idx, specs) in level_specs.iter().enumerate() {
912            if specs.is_empty() {
913                continue;
914            }
915
916            let src_level = &self.levels[level_idx];
917
918            let columns: Vec<FactorizedVector> = specs
919                .iter()
920                .filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
921                .collect();
922
923            let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
924
925            if level_idx == 0 {
926                new_levels.push(FactorizationLevel::flat(columns, names));
927            } else {
928                let mults = src_level.multiplicities().to_vec();
929                new_levels.push(FactorizationLevel::unflat(columns, names, mults));
930            }
931        }
932
933        if new_levels.is_empty() {
934            return Self::empty();
935        }
936
937        let mut result = Self {
938            levels: new_levels,
939            logical_row_count: 0,
940            state: ChunkState::flat(0),
941        };
942        result.recompute_logical_row_count();
943        result.update_state();
944        result
945    }
946}
947
948/// Iterator over logical rows in a factorized chunk.
949///
950/// Instead of materializing all rows, this iterator yields index tuples
951/// that can be used to access values at each level.
952///
953/// # Alternatives
954///
955/// For better performance, consider using the iterators from [`factorized_iter`](super::factorized_iter):
956///
957/// - [`PrecomputedIter`](super::factorized_iter::PrecomputedIter) - Pre-computes all indices
958///   for O(1) random access and better cache locality
959/// - [`StreamingIter`](super::factorized_iter::StreamingIter) - More memory-efficient
960///   streaming iteration with SmallVec stack allocation
961/// - [`RowView`](super::factorized_iter::RowView) - Zero-copy access to row values
962pub struct FactorizedRowIterator<'a> {
963    chunk: &'a FactorizedChunk,
964    /// Current physical indices at each level.
965    indices: Vec<usize>,
966    /// Maximum physical index at each level (per parent).
967    /// This is updated as we traverse.
968    exhausted: bool,
969}
970
971impl<'a> FactorizedRowIterator<'a> {
972    fn new(chunk: &'a FactorizedChunk) -> Self {
973        let indices = vec![0; chunk.level_count()];
974        let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
975
976        let mut iter = Self {
977            chunk,
978            indices,
979            exhausted,
980        };
981
982        // If initial position is invalid (e.g., first parent has 0 children), advance to valid position
983        if !exhausted && !iter.has_valid_deepest_range() {
984            if !iter.advance() {
985                exhausted = true;
986            }
987            iter.exhausted = exhausted;
988        }
989
990        iter
991    }
992
993    /// Advances the indices like a mixed-radix counter.
994    fn advance(&mut self) -> bool {
995        if self.exhausted || self.chunk.levels.is_empty() {
996            return false;
997        }
998
999        // Start from the deepest level and work backwards
1000        for level_idx in (0..self.chunk.levels.len()).rev() {
1001            let level = &self.chunk.levels[level_idx];
1002
1003            // Get the parent index for this level
1004            let parent_idx = if level_idx == 0 {
1005                // Level 0 has no parent - just check bounds
1006                self.indices[0] + 1
1007            } else {
1008                // Get current parent's physical index
1009                self.indices[level_idx - 1]
1010            };
1011
1012            // Get the range of valid indices for this parent
1013            let (_start, end) = if level_idx == 0 {
1014                (0, level.group_count)
1015            } else {
1016                // For unflat levels, get range from parent
1017                if let Some(col) = level.columns.first() {
1018                    col.range_for_parent(parent_idx)
1019                } else {
1020                    (0, 0)
1021                }
1022            };
1023
1024            let current = self.indices[level_idx];
1025            if current + 1 < end {
1026                // Can advance at this level
1027                self.indices[level_idx] = current + 1;
1028                // Reset all deeper levels to their start positions
1029                for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
1030                    if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
1031                        let (deeper_start, _) =
1032                            deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
1033                        self.indices[deeper_idx] = deeper_start;
1034                    }
1035                }
1036
1037                // Check if the deepest level has valid range - if any parent has 0 children,
1038                // we need to keep advancing instead of returning this invalid row
1039                if self.has_valid_deepest_range() {
1040                    return true;
1041                }
1042                // Otherwise, recursively try to advance again from the new position
1043                // This handles sparse data where many parents have 0 children
1044                return self.advance();
1045            }
1046            // Can't advance at this level - try parent level
1047        }
1048
1049        // Couldn't advance at any level - exhausted
1050        self.exhausted = true;
1051        false
1052    }
1053
1054    /// Checks if all levels have valid (non-empty) ranges for their current parent.
1055    ///
1056    /// This must check ALL levels, not just the deepest, because when an
1057    /// intermediate level has an empty range, deeper levels get reset to
1058    /// out-of-bounds indices that can alias into unrelated valid ranges.
1059    fn has_valid_deepest_range(&self) -> bool {
1060        if self.chunk.levels.len() <= 1 {
1061            return true; // Single level or empty - always valid
1062        }
1063
1064        // Check every unflat level (1..len) has a non-empty range for its parent
1065        for level_idx in 1..self.chunk.levels.len() {
1066            let parent_idx = self.indices[level_idx - 1];
1067            if let Some(col) = self.chunk.levels[level_idx].columns.first() {
1068                let (start, end) = col.range_for_parent(parent_idx);
1069                if start >= end {
1070                    return false;
1071                }
1072            } else {
1073                return false;
1074            }
1075        }
1076
1077        true
1078    }
1079}
1080
1081impl Iterator for FactorizedRowIterator<'_> {
1082    type Item = Vec<usize>;
1083
1084    fn next(&mut self) -> Option<Self::Item> {
1085        if self.exhausted {
1086            return None;
1087        }
1088
1089        // Return current indices, then advance
1090        let result = self.indices.clone();
1091        self.advance();
1092        Some(result)
1093    }
1094}
1095
1096/// A chunk that can be either flat (DataChunk) or factorized (FactorizedChunk).
1097#[derive(Debug, Clone)]
1098pub enum ChunkVariant {
1099    /// A flat chunk with all rows materialized.
1100    Flat(DataChunk),
1101    /// A factorized chunk with multi-level representation.
1102    Factorized(FactorizedChunk),
1103}
1104
1105impl ChunkVariant {
1106    /// Creates a flat variant from a DataChunk.
1107    #[must_use]
1108    pub fn flat(chunk: DataChunk) -> Self {
1109        Self::Flat(chunk)
1110    }
1111
1112    /// Creates a factorized variant from a FactorizedChunk.
1113    #[must_use]
1114    pub fn factorized(chunk: FactorizedChunk) -> Self {
1115        Self::Factorized(chunk)
1116    }
1117
1118    /// Ensures the chunk is flat, flattening if necessary.
1119    #[must_use]
1120    pub fn ensure_flat(self) -> DataChunk {
1121        match self {
1122            Self::Flat(chunk) => chunk,
1123            Self::Factorized(chunk) => chunk.flatten(),
1124        }
1125    }
1126
1127    /// Returns the logical row count.
1128    #[must_use]
1129    pub fn logical_row_count(&self) -> usize {
1130        match self {
1131            Self::Flat(chunk) => chunk.row_count(),
1132            Self::Factorized(chunk) => chunk.logical_row_count(),
1133        }
1134    }
1135
1136    /// Returns true if this is a factorized chunk.
1137    #[must_use]
1138    pub fn is_factorized(&self) -> bool {
1139        matches!(self, Self::Factorized(_))
1140    }
1141
1142    /// Returns true if this is a flat chunk.
1143    #[must_use]
1144    pub fn is_flat(&self) -> bool {
1145        matches!(self, Self::Flat(_))
1146    }
1147
1148    /// Returns true if the chunk is empty.
1149    #[must_use]
1150    pub fn is_empty(&self) -> bool {
1151        self.logical_row_count() == 0
1152    }
1153}
1154
1155impl From<DataChunk> for ChunkVariant {
1156    fn from(chunk: DataChunk) -> Self {
1157        Self::Flat(chunk)
1158    }
1159}
1160
1161impl From<FactorizedChunk> for ChunkVariant {
1162    fn from(chunk: FactorizedChunk) -> Self {
1163        Self::Factorized(chunk)
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use grafeo_common::types::{LogicalType, NodeId, Value};
1170
1171    use super::*;
1172
1173    fn make_flat_chunk() -> DataChunk {
1174        let mut col = ValueVector::with_type(LogicalType::Int64);
1175        col.push_int64(1);
1176        col.push_int64(2);
1177        DataChunk::new(vec![col])
1178    }
1179
1180    fn create_multi_level_chunk() -> FactorizedChunk {
1181        // 2 sources, each with 2 neighbors = 4 logical rows
1182        let mut sources = ValueVector::with_type(LogicalType::Int64);
1183        sources.push_int64(10);
1184        sources.push_int64(20);
1185
1186        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1187
1188        let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1189        neighbors.push_int64(1);
1190        neighbors.push_int64(2);
1191        neighbors.push_int64(3);
1192        neighbors.push_int64(4);
1193
1194        let offsets = vec![0, 2, 4];
1195        chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1196        chunk
1197    }
1198
1199    #[test]
1200    fn test_from_flat() {
1201        let flat = make_flat_chunk();
1202        let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1203
1204        assert_eq!(factorized.level_count(), 1);
1205        assert_eq!(factorized.logical_row_count(), 2);
1206        assert_eq!(factorized.physical_size(), 2);
1207    }
1208
1209    #[test]
1210    fn test_add_level() {
1211        // Start with 2 source nodes
1212        let mut col0 = ValueVector::with_type(LogicalType::Node);
1213        col0.push_node_id(NodeId::new(100));
1214        col0.push_node_id(NodeId::new(200));
1215
1216        let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
1217
1218        assert_eq!(chunk.level_count(), 1);
1219        assert_eq!(chunk.logical_row_count(), 2);
1220
1221        // Add level 1: source 0 has 3 neighbors, source 1 has 2 neighbors
1222        let mut neighbors = ValueVector::with_type(LogicalType::Node);
1223        neighbors.push_node_id(NodeId::new(10));
1224        neighbors.push_node_id(NodeId::new(11));
1225        neighbors.push_node_id(NodeId::new(12));
1226        neighbors.push_node_id(NodeId::new(20));
1227        neighbors.push_node_id(NodeId::new(21));
1228
1229        let offsets = vec![0, 3, 5]; // source 0: 0..3, source 1: 3..5
1230        chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
1231
1232        assert_eq!(chunk.level_count(), 2);
1233        assert_eq!(chunk.logical_row_count(), 5); // 3 + 2 neighbors
1234        assert_eq!(chunk.physical_size(), 2 + 5); // 2 sources + 5 neighbors
1235    }
1236
1237    #[test]
1238    fn test_flatten_single_level() {
1239        let flat = make_flat_chunk();
1240        let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1241
1242        let flattened = factorized.flatten();
1243        assert_eq!(flattened.row_count(), 2);
1244        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
1245        assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
1246    }
1247
1248    #[test]
1249    fn test_flatten_multi_level() {
1250        // 2 sources, each with 2 neighbors = 4 logical rows
1251        let mut sources = ValueVector::with_type(LogicalType::Int64);
1252        sources.push_int64(1);
1253        sources.push_int64(2);
1254
1255        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1256
1257        let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1258        neighbors.push_int64(10);
1259        neighbors.push_int64(11);
1260        neighbors.push_int64(20);
1261        neighbors.push_int64(21);
1262
1263        let offsets = vec![0, 2, 4];
1264        chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
1265
1266        let flat = chunk.flatten();
1267        assert_eq!(flat.row_count(), 4);
1268        assert_eq!(flat.column_count(), 2);
1269
1270        // Check that sources are duplicated correctly
1271        // Row 0: (1, 10), Row 1: (1, 11), Row 2: (2, 20), Row 3: (2, 21)
1272        assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
1273        assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
1274        assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
1275        assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
1276        assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
1277        assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
1278        assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
1279        assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
1280    }
1281
1282    #[test]
1283    fn test_logical_row_iter_single_level() {
1284        let flat = make_flat_chunk();
1285        let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
1286
1287        let indices: Vec<_> = factorized.logical_row_iter().collect();
1288        assert_eq!(indices.len(), 2);
1289        assert_eq!(indices[0], vec![0]);
1290        assert_eq!(indices[1], vec![1]);
1291    }
1292
1293    #[test]
1294    fn test_chunk_variant() {
1295        let flat = make_flat_chunk();
1296        let variant = ChunkVariant::flat(flat.clone());
1297
1298        assert!(variant.is_flat());
1299        assert!(!variant.is_factorized());
1300        assert_eq!(variant.logical_row_count(), 2);
1301
1302        let ensured = variant.ensure_flat();
1303        assert_eq!(ensured.row_count(), 2);
1304    }
1305
1306    #[test]
1307    fn test_chunk_variant_factorized() {
1308        let chunk = create_multi_level_chunk();
1309        let variant = ChunkVariant::factorized(chunk);
1310
1311        assert!(variant.is_factorized());
1312        assert!(!variant.is_flat());
1313        assert_eq!(variant.logical_row_count(), 4);
1314
1315        let flat = variant.ensure_flat();
1316        assert_eq!(flat.row_count(), 4);
1317    }
1318
1319    #[test]
1320    fn test_chunk_variant_from() {
1321        let flat = make_flat_chunk();
1322        let variant: ChunkVariant = flat.into();
1323        assert!(variant.is_flat());
1324
1325        let factorized = create_multi_level_chunk();
1326        let variant2: ChunkVariant = factorized.into();
1327        assert!(variant2.is_factorized());
1328    }
1329
1330    #[test]
1331    fn test_chunk_variant_is_empty() {
1332        let empty_flat = DataChunk::empty();
1333        let variant = ChunkVariant::flat(empty_flat);
1334        assert!(variant.is_empty());
1335
1336        let non_empty = make_flat_chunk();
1337        let variant2 = ChunkVariant::flat(non_empty);
1338        assert!(!variant2.is_empty());
1339    }
1340
1341    #[test]
1342    fn test_empty_chunk() {
1343        let chunk = FactorizedChunk::empty();
1344        assert_eq!(chunk.level_count(), 0);
1345        assert_eq!(chunk.logical_row_count(), 0);
1346        assert_eq!(chunk.physical_size(), 0);
1347
1348        let flat = chunk.flatten();
1349        assert!(flat.is_empty());
1350    }
1351
1352    #[test]
1353    fn test_all_column_names() {
1354        let mut sources = ValueVector::with_type(LogicalType::Int64);
1355        sources.push_int64(1);
1356
1357        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
1358
1359        let mut neighbors = ValueVector::with_type(LogicalType::Int64);
1360        neighbors.push_int64(10);
1361
1362        chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
1363
1364        let names = chunk.all_column_names();
1365        assert_eq!(names, vec!["source", "neighbor"]);
1366    }
1367
1368    #[test]
1369    fn test_level_mut() {
1370        let mut chunk = create_multi_level_chunk();
1371
1372        // Access level mutably
1373        let level = chunk.level_mut(0).unwrap();
1374        assert_eq!(level.column_count(), 1);
1375
1376        // Invalid level should return None
1377        assert!(chunk.level_mut(10).is_none());
1378    }
1379
1380    #[test]
1381    fn test_factorization_level_column_mut() {
1382        let mut chunk = create_multi_level_chunk();
1383
1384        let level = chunk.level_mut(0).unwrap();
1385        let col = level.column_mut(0);
1386        assert!(col.is_some());
1387
1388        // Invalid column should return None
1389        assert!(level.column_mut(10).is_none());
1390    }
1391
1392    #[test]
1393    fn test_factorization_level_physical_value_count() {
1394        let chunk = create_multi_level_chunk();
1395
1396        let level0 = chunk.level(0).unwrap();
1397        assert_eq!(level0.physical_value_count(), 2); // 2 sources
1398
1399        let level1 = chunk.level(1).unwrap();
1400        assert_eq!(level1.physical_value_count(), 4); // 4 neighbors
1401    }
1402
1403    #[test]
1404    fn test_count_rows() {
1405        let chunk = create_multi_level_chunk();
1406        assert_eq!(chunk.count_rows(), 4);
1407
1408        let empty = FactorizedChunk::empty();
1409        assert_eq!(empty.count_rows(), 0);
1410    }
1411
1412    #[test]
1413    fn test_compute_path_multiplicities() {
1414        let chunk = create_multi_level_chunk();
1415
1416        let mults = chunk.compute_path_multiplicities();
1417        // Each value at the deepest level has multiplicity 1 since each parent has 2 children
1418        assert_eq!(mults.len(), 4);
1419        assert!(mults.iter().all(|&m| m == 1));
1420    }
1421
1422    #[test]
1423    fn test_compute_path_multiplicities_single_level() {
1424        let mut col = ValueVector::with_type(LogicalType::Int64);
1425        col.push_int64(1);
1426        col.push_int64(2);
1427        col.push_int64(3);
1428
1429        let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
1430        let mults = chunk.compute_path_multiplicities();
1431
1432        // Single level: each value has multiplicity 1
1433        assert_eq!(mults.len(), 3);
1434        assert!(mults.iter().all(|&m| m == 1));
1435    }
1436
1437    #[test]
1438    fn test_compute_path_multiplicities_empty() {
1439        let chunk = FactorizedChunk::empty();
1440        let mults = chunk.compute_path_multiplicities();
1441        assert!(mults.is_empty());
1442    }
1443
1444    #[test]
1445    fn test_path_multiplicities_cached() {
1446        let mut chunk = create_multi_level_chunk();
1447
1448        // First call computes and caches
1449        let mults1 = chunk.path_multiplicities_cached();
1450        assert_eq!(mults1.len(), 4);
1451
1452        // Second call should return cached value
1453        let mults2 = chunk.path_multiplicities_cached();
1454        assert_eq!(mults1.len(), mults2.len());
1455    }
1456
1457    #[test]
1458    fn test_sum_deepest() {
1459        let chunk = create_multi_level_chunk();
1460
1461        // Deepest level has values [1, 2, 3, 4]
1462        let sum = chunk.sum_deepest(0);
1463        assert_eq!(sum, Some(10.0)); // 1 + 2 + 3 + 4
1464    }
1465
1466    #[test]
1467    fn test_sum_deepest_empty() {
1468        let chunk = FactorizedChunk::empty();
1469        assert!(chunk.sum_deepest(0).is_none());
1470    }
1471
1472    #[test]
1473    fn test_sum_deepest_invalid_column() {
1474        let chunk = create_multi_level_chunk();
1475        assert!(chunk.sum_deepest(10).is_none());
1476    }
1477
1478    #[test]
1479    fn test_avg_deepest() {
1480        let chunk = create_multi_level_chunk();
1481
1482        // Deepest level has values [1, 2, 3, 4], avg = 2.5
1483        let avg = chunk.avg_deepest(0);
1484        assert_eq!(avg, Some(2.5));
1485    }
1486
1487    #[test]
1488    fn test_avg_deepest_empty() {
1489        let chunk = FactorizedChunk::empty();
1490        assert!(chunk.avg_deepest(0).is_none());
1491    }
1492
1493    #[test]
1494    fn test_min_deepest() {
1495        let chunk = create_multi_level_chunk();
1496
1497        let min = chunk.min_deepest(0);
1498        assert_eq!(min, Some(Value::Int64(1)));
1499    }
1500
1501    #[test]
1502    fn test_min_deepest_empty() {
1503        let chunk = FactorizedChunk::empty();
1504        assert!(chunk.min_deepest(0).is_none());
1505    }
1506
1507    #[test]
1508    fn test_min_deepest_invalid_column() {
1509        let chunk = create_multi_level_chunk();
1510        assert!(chunk.min_deepest(10).is_none());
1511    }
1512
1513    #[test]
1514    fn test_max_deepest() {
1515        let chunk = create_multi_level_chunk();
1516
1517        let max = chunk.max_deepest(0);
1518        assert_eq!(max, Some(Value::Int64(4)));
1519    }
1520
1521    #[test]
1522    fn test_max_deepest_empty() {
1523        let chunk = FactorizedChunk::empty();
1524        assert!(chunk.max_deepest(0).is_none());
1525    }
1526
1527    #[test]
1528    fn test_value_less_than() {
1529        // Null handling
1530        assert!(FactorizedChunk::value_less_than(
1531            &Value::Null,
1532            &Value::Int64(1)
1533        ));
1534        assert!(!FactorizedChunk::value_less_than(
1535            &Value::Int64(1),
1536            &Value::Null
1537        ));
1538        assert!(!FactorizedChunk::value_less_than(
1539            &Value::Null,
1540            &Value::Null
1541        ));
1542
1543        // Int64
1544        assert!(FactorizedChunk::value_less_than(
1545            &Value::Int64(1),
1546            &Value::Int64(2)
1547        ));
1548        assert!(!FactorizedChunk::value_less_than(
1549            &Value::Int64(2),
1550            &Value::Int64(1)
1551        ));
1552
1553        // Float64
1554        assert!(FactorizedChunk::value_less_than(
1555            &Value::Float64(1.5),
1556            &Value::Float64(2.5)
1557        ));
1558
1559        // Mixed Int/Float
1560        assert!(FactorizedChunk::value_less_than(
1561            &Value::Int64(1),
1562            &Value::Float64(1.5)
1563        ));
1564        assert!(FactorizedChunk::value_less_than(
1565            &Value::Float64(0.5),
1566            &Value::Int64(1)
1567        ));
1568
1569        // String
1570        assert!(FactorizedChunk::value_less_than(
1571            &Value::String("apple".into()),
1572            &Value::String("banana".into())
1573        ));
1574
1575        // Bool (false < true)
1576        assert!(FactorizedChunk::value_less_than(
1577            &Value::Bool(false),
1578            &Value::Bool(true)
1579        ));
1580        assert!(!FactorizedChunk::value_less_than(
1581            &Value::Bool(true),
1582            &Value::Bool(false)
1583        ));
1584
1585        // Incompatible types return false
1586        assert!(!FactorizedChunk::value_less_than(
1587            &Value::Int64(1),
1588            &Value::String("hello".into())
1589        ));
1590    }
1591
1592    #[test]
1593    fn test_filter_deepest() {
1594        let chunk = create_multi_level_chunk();
1595
1596        // Filter to keep only values > 2
1597        let filtered = chunk.filter_deepest(0, |v| {
1598            if let Value::Int64(n) = v {
1599                *n > 2
1600            } else {
1601                false
1602            }
1603        });
1604
1605        let filtered = filtered.unwrap();
1606        assert_eq!(filtered.logical_row_count(), 2); // Only 3 and 4 remain
1607    }
1608
1609    #[test]
1610    fn test_filter_deepest_empty() {
1611        let chunk = FactorizedChunk::empty();
1612        assert!(chunk.filter_deepest(0, |_| true).is_none());
1613    }
1614
1615    #[test]
1616    fn test_filter_deepest_all_filtered() {
1617        let chunk = create_multi_level_chunk();
1618
1619        // Filter everything out
1620        let filtered = chunk.filter_deepest(0, |_| false);
1621
1622        let filtered = filtered.unwrap();
1623        assert_eq!(filtered.logical_row_count(), 0);
1624    }
1625
1626    #[test]
1627    fn test_filter_deepest_invalid_column() {
1628        let chunk = create_multi_level_chunk();
1629        assert!(chunk.filter_deepest(10, |_| true).is_none());
1630    }
1631
1632    #[test]
1633    fn test_filter_deepest_multi() {
1634        // Create a chunk with 2 columns at the deepest level
1635        let mut sources = ValueVector::with_type(LogicalType::Int64);
1636        sources.push_int64(1);
1637
1638        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1639
1640        let mut col1 = ValueVector::with_type(LogicalType::Int64);
1641        col1.push_int64(10);
1642        col1.push_int64(20);
1643        col1.push_int64(30);
1644
1645        let mut col2 = ValueVector::with_type(LogicalType::Int64);
1646        col2.push_int64(1);
1647        col2.push_int64(2);
1648        col2.push_int64(3);
1649
1650        let offsets = vec![0, 3];
1651        chunk.add_level(
1652            vec![col1, col2],
1653            vec!["a".to_string(), "b".to_string()],
1654            &offsets,
1655        );
1656
1657        // Filter based on both columns
1658        let filtered = chunk.filter_deepest_multi(|values| {
1659            if values.len() == 2
1660                && let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
1661            {
1662                return *a + *b > 15;
1663            }
1664            false
1665        });
1666
1667        assert!(filtered.is_some());
1668        let filtered = filtered.unwrap();
1669        assert_eq!(filtered.logical_row_count(), 2); // (20,2) and (30,3) pass
1670    }
1671
1672    #[test]
1673    fn test_filter_deepest_multi_empty() {
1674        let chunk = FactorizedChunk::empty();
1675        assert!(chunk.filter_deepest_multi(|_| true).is_none());
1676    }
1677
1678    #[test]
1679    fn test_filter_deepest_multi_no_columns() {
1680        // Create a chunk with no columns at level 1
1681        let mut sources = ValueVector::with_type(LogicalType::Int64);
1682        sources.push_int64(1);
1683
1684        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1685
1686        // Add empty level (edge case)
1687        let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
1688        chunk.add_factorized_level(empty_level);
1689
1690        assert!(chunk.filter_deepest_multi(|_| true).is_none());
1691    }
1692
1693    #[test]
1694    fn test_project() {
1695        let mut sources = ValueVector::with_type(LogicalType::Int64);
1696        sources.push_int64(1);
1697        sources.push_int64(2);
1698
1699        let mut col2 = ValueVector::with_type(LogicalType::String);
1700        col2.push_string("a");
1701        col2.push_string("b");
1702
1703        let chunk = FactorizedChunk::with_flat_level(
1704            vec![sources, col2],
1705            vec!["num".to_string(), "str".to_string()],
1706        );
1707
1708        // Project only the first column
1709        let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
1710
1711        assert_eq!(projected.total_column_count(), 1);
1712        let names = projected.all_column_names();
1713        assert_eq!(names, vec!["projected_num"]);
1714    }
1715
1716    #[test]
1717    fn test_project_empty() {
1718        let chunk = FactorizedChunk::empty();
1719        let projected = chunk.project(&[(0, 0, "col".to_string())]);
1720        assert_eq!(projected.level_count(), 0);
1721    }
1722
1723    #[test]
1724    fn test_project_empty_specs() {
1725        let chunk = create_multi_level_chunk();
1726        let projected = chunk.project(&[]);
1727        assert_eq!(projected.level_count(), 0);
1728    }
1729
1730    #[test]
1731    fn test_project_invalid_level() {
1732        let chunk = create_multi_level_chunk();
1733
1734        // Project from invalid level
1735        let projected = chunk.project(&[(10, 0, "col".to_string())]);
1736        assert_eq!(projected.level_count(), 0);
1737    }
1738
1739    #[test]
1740    fn test_project_multi_level() {
1741        let chunk = create_multi_level_chunk();
1742
1743        // Project from both levels
1744        let projected =
1745            chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
1746
1747        assert_eq!(projected.level_count(), 2);
1748        assert_eq!(projected.total_column_count(), 2);
1749    }
1750
1751    #[test]
1752    fn test_total_column_count() {
1753        let chunk = create_multi_level_chunk();
1754        assert_eq!(chunk.total_column_count(), 2); // 1 at level 0, 1 at level 1
1755    }
1756
1757    #[test]
1758    fn test_chunk_state_access() {
1759        let mut chunk = create_multi_level_chunk();
1760
1761        let state = chunk.chunk_state();
1762        assert!(state.is_factorized());
1763
1764        let state_mut = chunk.chunk_state_mut();
1765        state_mut.invalidate_cache();
1766    }
1767
1768    #[test]
1769    fn test_logical_row_iter_multi_level() {
1770        let chunk = create_multi_level_chunk();
1771
1772        let indices: Vec<_> = chunk.logical_row_iter().collect();
1773        assert_eq!(indices.len(), 4);
1774
1775        // Verify structure: [source_idx, neighbor_idx]
1776        assert_eq!(indices[0], vec![0, 0]);
1777        assert_eq!(indices[1], vec![0, 1]);
1778        assert_eq!(indices[2], vec![1, 2]);
1779        assert_eq!(indices[3], vec![1, 3]);
1780    }
1781
1782    #[test]
1783    fn test_sum_deepest_with_float() {
1784        let mut sources = ValueVector::with_type(LogicalType::Int64);
1785        sources.push_int64(1);
1786
1787        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1788
1789        let mut floats = ValueVector::with_type(LogicalType::Float64);
1790        floats.push_float64(1.5);
1791        floats.push_float64(2.5);
1792        floats.push_float64(3.0);
1793
1794        chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
1795
1796        let sum = chunk.sum_deepest(0);
1797        assert_eq!(sum, Some(7.0)); // 1.5 + 2.5 + 3.0
1798    }
1799
1800    #[test]
1801    fn test_min_max_with_strings() {
1802        let mut sources = ValueVector::with_type(LogicalType::Int64);
1803        sources.push_int64(1);
1804
1805        let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
1806
1807        let mut strings = ValueVector::with_type(LogicalType::String);
1808        strings.push_string("banana");
1809        strings.push_string("apple");
1810        strings.push_string("cherry");
1811
1812        chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
1813
1814        let min = chunk.min_deepest(0);
1815        assert_eq!(min, Some(Value::String("apple".into())));
1816
1817        let max = chunk.max_deepest(0);
1818        assert_eq!(max, Some(Value::String("cherry".into())));
1819    }
1820
1821    #[test]
1822    fn test_recompute_logical_row_count_empty() {
1823        let mut chunk = FactorizedChunk::empty();
1824        chunk.recompute_logical_row_count();
1825        assert_eq!(chunk.logical_row_count(), 0);
1826    }
1827
1828    #[test]
1829    fn test_factorization_level_group_count() {
1830        let chunk = create_multi_level_chunk();
1831
1832        let level0 = chunk.level(0).unwrap();
1833        assert_eq!(level0.group_count(), 2);
1834
1835        let level1 = chunk.level(1).unwrap();
1836        assert_eq!(level1.group_count(), 4);
1837    }
1838
1839    #[test]
1840    fn test_factorization_level_multiplicities() {
1841        let chunk = create_multi_level_chunk();
1842
1843        let level1 = chunk.level(1).unwrap();
1844        let mults = level1.multiplicities();
1845        assert_eq!(mults, &[2, 2]); // Each source has 2 neighbors
1846    }
1847
1848    #[test]
1849    fn test_factorization_level_column_names() {
1850        let chunk = create_multi_level_chunk();
1851
1852        let level0 = chunk.level(0).unwrap();
1853        assert_eq!(level0.column_names(), &["src"]);
1854
1855        let level1 = chunk.level(1).unwrap();
1856        assert_eq!(level1.column_names(), &["nbr"]);
1857    }
1858}