grafeo_core/execution/operators/
unwind.rs1use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7pub struct UnwindOperator {
12 child: Box<dyn Operator>,
14 list_col_idx: usize,
16 variable_name: String,
18 output_schema: Vec<LogicalType>,
20 current_chunk: Option<DataChunk>,
22 current_row: usize,
24 current_list_idx: usize,
26 current_list: Option<Vec<Value>>,
28}
29
30impl UnwindOperator {
31 pub fn new(
39 child: Box<dyn Operator>,
40 list_col_idx: usize,
41 variable_name: String,
42 output_schema: Vec<LogicalType>,
43 ) -> Self {
44 Self {
45 child,
46 list_col_idx,
47 variable_name,
48 output_schema,
49 current_chunk: None,
50 current_row: 0,
51 current_list_idx: 0,
52 current_list: None,
53 }
54 }
55
56 #[must_use]
58 pub fn variable_name(&self) -> &str {
59 &self.variable_name
60 }
61
62 fn advance(&mut self) -> OperatorResult {
64 loop {
65 if let Some(list) = &self.current_list {
67 if self.current_list_idx < list.len() {
68 return Ok(Some(self.emit_row()?));
70 }
71 }
72
73 self.current_list_idx = 0;
75 self.current_list = None;
76
77 if self.current_chunk.is_none() {
79 self.current_chunk = self.child.next()?;
80 self.current_row = 0;
81 if self.current_chunk.is_none() {
82 return Ok(None); }
84 }
85
86 let chunk = self.current_chunk.as_ref().unwrap();
87
88 while self.current_row < chunk.row_count() {
90 if let Some(col) = chunk.column(self.list_col_idx) {
91 if let Some(value) = col.get_value(self.current_row) {
92 if let Value::List(list_arc) = value {
93 let list: Vec<Value> = list_arc.iter().cloned().collect();
95 if !list.is_empty() {
96 self.current_list = Some(list);
97 return Ok(Some(self.emit_row()?));
98 }
99 }
100 }
101 }
102 self.current_row += 1;
103 }
104
105 self.current_chunk = None;
107 }
108 }
109
110 fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
112 let chunk = self.current_chunk.as_ref().unwrap();
113 let list = self.current_list.as_ref().unwrap();
114 let element = list[self.current_list_idx].clone();
115
116 let mut builder = DataChunkBuilder::new(&self.output_schema);
118
119 for col_idx in 0..chunk.column_count() {
121 if col_idx == self.list_col_idx {
122 continue; }
124 if let Some(col) = chunk.column(col_idx) {
125 if let Some(value) = col.get_value(self.current_row) {
126 if let Some(out_col) = builder.column_mut(col_idx) {
127 out_col.push_value(value);
128 }
129 }
130 }
131 }
132
133 let new_col_idx = self.output_schema.len() - 1;
135 if let Some(out_col) = builder.column_mut(new_col_idx) {
136 out_col.push_value(element);
137 }
138
139 builder.advance_row();
140 self.current_list_idx += 1;
141
142 if self.current_list_idx >= list.len() {
144 self.current_row += 1;
145 }
146
147 Ok(builder.finish())
148 }
149}
150
151impl Operator for UnwindOperator {
152 fn next(&mut self) -> OperatorResult {
153 self.advance()
154 }
155
156 fn reset(&mut self) {
157 self.child.reset();
158 self.current_chunk = None;
159 self.current_row = 0;
160 self.current_list_idx = 0;
161 self.current_list = None;
162 }
163
164 fn name(&self) -> &'static str {
165 "Unwind"
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use crate::execution::chunk::DataChunkBuilder;
173 use std::sync::Arc;
174
175 struct MockOperator {
176 chunks: Vec<DataChunk>,
177 position: usize,
178 }
179
180 impl Operator for MockOperator {
181 fn next(&mut self) -> OperatorResult {
182 if self.position < self.chunks.len() {
183 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
184 self.position += 1;
185 Ok(Some(chunk))
186 } else {
187 Ok(None)
188 }
189 }
190
191 fn reset(&mut self) {
192 self.position = 0;
193 }
194
195 fn name(&self) -> &'static str {
196 "MockOperator"
197 }
198 }
199
200 #[test]
201 fn test_unwind_basic() {
202 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); let list = Value::List(Arc::new([
205 Value::Int64(1),
206 Value::Int64(2),
207 Value::Int64(3),
208 ]));
209 builder.column_mut(0).unwrap().push_value(list);
210 builder.advance_row();
211 let chunk = builder.finish();
212
213 let mock = MockOperator {
214 chunks: vec![chunk],
215 position: 0,
216 };
217
218 let mut unwind = UnwindOperator::new(
220 Box::new(mock),
221 0,
222 "x".to_string(),
223 vec![LogicalType::Int64], );
225
226 let mut results = Vec::new();
228 while let Ok(Some(chunk)) = unwind.next() {
229 results.push(chunk);
230 }
231
232 assert_eq!(results.len(), 3);
233 }
234}