vibesql_executor/evaluator/window/
partitioning.rs

1//! Row partitioning for window functions
2//!
3//! Groups rows into partitions based on PARTITION BY expressions.
4
5use vibesql_ast::Expression;
6use vibesql_storage::Row;
7use vibesql_types::SqlValue;
8
9/// A partition of rows for window function evaluation
10#[derive(Debug, Clone)]
11pub struct Partition {
12    pub rows: Vec<Row>,
13    /// Original indices of rows before partitioning/sorting
14    pub original_indices: Vec<usize>,
15}
16
17impl Partition {
18    pub fn new(rows: Vec<Row>) -> Self {
19        let original_indices = (0..rows.len()).collect();
20        Self { rows, original_indices }
21    }
22
23    pub fn with_indices(rows: Vec<Row>, original_indices: Vec<usize>) -> Self {
24        Self { rows, original_indices }
25    }
26
27    pub fn len(&self) -> usize {
28        self.rows.len()
29    }
30
31    pub fn is_empty(&self) -> bool {
32        self.rows.is_empty()
33    }
34}
35
36/// Partition rows by PARTITION BY expressions
37///
38/// Groups rows into partitions based on partition expressions.
39/// If no PARTITION BY clause, all rows go into a single partition.
40pub fn partition_rows<F>(
41    rows: Vec<Row>,
42    partition_by: &Option<Vec<Expression>>,
43    eval_fn: F,
44) -> Vec<Partition>
45where
46    F: Fn(&Expression, &Row) -> Result<SqlValue, String>,
47{
48    // If no PARTITION BY, return all rows in single partition
49    let Some(partition_exprs) = partition_by else {
50        return vec![Partition::new(rows)];
51    };
52
53    if partition_exprs.is_empty() {
54        return vec![Partition::new(rows)];
55    }
56
57    // Group rows by partition key values (use BTreeMap for deterministic ordering)
58    // Track original indices through partitioning
59    let mut partitions_map: std::collections::BTreeMap<Vec<String>, Vec<(usize, Row)>> =
60        std::collections::BTreeMap::new();
61
62    for (original_idx, row) in rows.into_iter().enumerate() {
63        // Evaluate partition expressions for this row
64        let mut partition_key = Vec::new();
65
66        for expr in partition_exprs {
67            let value = eval_fn(expr, &row).unwrap_or(SqlValue::Null);
68            // Convert to string for grouping (handles NULL consistently)
69            partition_key.push(format!("{:?}", value));
70        }
71
72        partitions_map.entry(partition_key).or_default().push((original_idx, row));
73    }
74
75    // Convert HashMap to Vec<Partition>, preserving original indices
76    partitions_map
77        .into_values()
78        .map(|rows_with_indices| {
79            let (indices, rows): (Vec<_>, Vec<_>) = rows_with_indices.into_iter().unzip();
80            Partition::with_indices(rows, indices)
81        })
82        .collect()
83}