Skip to main content

kyu_executor/operators/
unwind.rs

1//! Unwind operator — flattens a list expression into individual rows.
2
3use kyu_common::KyuResult;
4use kyu_expression::{BoundExpression, evaluate};
5use kyu_types::TypedValue;
6
7use crate::context::ExecutionContext;
8use crate::data_chunk::DataChunk;
9use crate::physical_plan::PhysicalOperator;
10
11pub struct UnwindOp {
12    pub child: Box<PhysicalOperator>,
13    pub expression: BoundExpression,
14}
15
16impl UnwindOp {
17    pub fn new(child: PhysicalOperator, expression: BoundExpression) -> Self {
18        Self {
19            child: Box::new(child),
20            expression,
21        }
22    }
23
24    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
25        loop {
26            let chunk = match self.child.next(ctx)? {
27                Some(c) => c,
28                None => return Ok(None),
29            };
30
31            let parent_cols = chunk.num_columns();
32            let total_cols = parent_cols + 1; // Add one column for the unwound element.
33            let mut result = DataChunk::with_capacity(total_cols, chunk.num_rows() * 2);
34
35            for row_idx in 0..chunk.num_rows() {
36                let row_ref = chunk.row_ref(row_idx);
37                let list_val = evaluate(&self.expression, &row_ref)?;
38
39                match list_val {
40                    TypedValue::List(elements) => {
41                        for elem in elements {
42                            result.append_row_from_chunk_with_extra(&chunk, row_idx, elem);
43                        }
44                    }
45                    TypedValue::Null => {
46                        // UNWIND null produces no rows.
47                    }
48                    other => {
49                        // Single value — treat as single-element list.
50                        result.append_row_from_chunk_with_extra(&chunk, row_idx, other);
51                    }
52                }
53            }
54
55            if !result.is_empty() {
56                return Ok(Some(result));
57            }
58        }
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65    use crate::context::MockStorage;
66    use kyu_types::LogicalType;
67
68    #[test]
69    fn unwind_list() {
70        let storage = MockStorage::new();
71        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
72
73        let empty = PhysicalOperator::Empty(crate::operators::empty::EmptyOp::new(0));
74        let expr = BoundExpression::ListLiteral {
75            elements: vec![
76                BoundExpression::Literal {
77                    value: TypedValue::Int64(1),
78                    result_type: LogicalType::Int64,
79                },
80                BoundExpression::Literal {
81                    value: TypedValue::Int64(2),
82                    result_type: LogicalType::Int64,
83                },
84                BoundExpression::Literal {
85                    value: TypedValue::Int64(3),
86                    result_type: LogicalType::Int64,
87                },
88            ],
89            result_type: LogicalType::Any,
90        };
91
92        let mut unwind = UnwindOp::new(empty, expr);
93        let chunk = unwind.next(&ctx).unwrap().unwrap();
94        assert_eq!(chunk.num_rows(), 3);
95        // The unwound element is in column index 0 (parent had 0 cols, so element is col 0).
96        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(1));
97        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(2));
98        assert_eq!(chunk.get_value(2, 0), TypedValue::Int64(3));
99    }
100}