kyu_executor/operators/
unwind.rs1use kyu_common::KyuResult;
4use kyu_expression::{BoundExpression, evaluate};
5use kyu_types::TypedValue;
6
7use crate::context::ExecutionContext;
8use crate::data_chunk::DataChunk;
9use crate::physical_plan::PhysicalOperator;
10
11pub struct UnwindOp {
12 pub child: Box<PhysicalOperator>,
13 pub expression: BoundExpression,
14}
15
16impl UnwindOp {
17 pub fn new(child: PhysicalOperator, expression: BoundExpression) -> Self {
18 Self {
19 child: Box::new(child),
20 expression,
21 }
22 }
23
24 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
25 loop {
26 let chunk = match self.child.next(ctx)? {
27 Some(c) => c,
28 None => return Ok(None),
29 };
30
31 let parent_cols = chunk.num_columns();
32 let total_cols = parent_cols + 1; let mut result = DataChunk::with_capacity(total_cols, chunk.num_rows() * 2);
34
35 for row_idx in 0..chunk.num_rows() {
36 let row_ref = chunk.row_ref(row_idx);
37 let list_val = evaluate(&self.expression, &row_ref)?;
38
39 match list_val {
40 TypedValue::List(elements) => {
41 for elem in elements {
42 result.append_row_from_chunk_with_extra(&chunk, row_idx, elem);
43 }
44 }
45 TypedValue::Null => {
46 }
48 other => {
49 result.append_row_from_chunk_with_extra(&chunk, row_idx, other);
51 }
52 }
53 }
54
55 if !result.is_empty() {
56 return Ok(Some(result));
57 }
58 }
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65 use crate::context::MockStorage;
66 use kyu_types::LogicalType;
67
68 #[test]
69 fn unwind_list() {
70 let storage = MockStorage::new();
71 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
72
73 let empty = PhysicalOperator::Empty(crate::operators::empty::EmptyOp::new(0));
74 let expr = BoundExpression::ListLiteral {
75 elements: vec![
76 BoundExpression::Literal {
77 value: TypedValue::Int64(1),
78 result_type: LogicalType::Int64,
79 },
80 BoundExpression::Literal {
81 value: TypedValue::Int64(2),
82 result_type: LogicalType::Int64,
83 },
84 BoundExpression::Literal {
85 value: TypedValue::Int64(3),
86 result_type: LogicalType::Int64,
87 },
88 ],
89 result_type: LogicalType::Any,
90 };
91
92 let mut unwind = UnwindOp::new(empty, expr);
93 let chunk = unwind.next(&ctx).unwrap().unwrap();
94 assert_eq!(chunk.num_rows(), 3);
95 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(1));
97 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(2));
98 assert_eq!(chunk.get_value(2, 0), TypedValue::Int64(3));
99 }
100}