grafeo_core/execution/operators/
unwind.rs1use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7pub struct UnwindOperator {
12 child: Box<dyn Operator>,
14 list_col_idx: usize,
16 variable_name: String,
18 output_schema: Vec<LogicalType>,
20 current_chunk: Option<DataChunk>,
22 current_row: usize,
24 current_list_idx: usize,
26 current_list: Option<Vec<Value>>,
28}
29
30impl UnwindOperator {
31 pub fn new(
39 child: Box<dyn Operator>,
40 list_col_idx: usize,
41 variable_name: String,
42 output_schema: Vec<LogicalType>,
43 ) -> Self {
44 Self {
45 child,
46 list_col_idx,
47 variable_name,
48 output_schema,
49 current_chunk: None,
50 current_row: 0,
51 current_list_idx: 0,
52 current_list: None,
53 }
54 }
55
56 #[must_use]
58 pub fn variable_name(&self) -> &str {
59 &self.variable_name
60 }
61
62 fn advance(&mut self) -> OperatorResult {
64 loop {
65 if let Some(list) = &self.current_list
67 && self.current_list_idx < list.len()
68 {
69 return Ok(Some(self.emit_row()?));
71 }
72
73 self.current_list_idx = 0;
75 self.current_list = None;
76
77 if self.current_chunk.is_none() {
79 self.current_chunk = self.child.next()?;
80 self.current_row = 0;
81 if self.current_chunk.is_none() {
82 return Ok(None); }
84 }
85
86 let chunk = self.current_chunk.as_ref().unwrap();
87
88 while self.current_row < chunk.row_count() {
90 if let Some(col) = chunk.column(self.list_col_idx)
91 && let Some(value) = col.get_value(self.current_row)
92 && let Value::List(list_arc) = value
93 {
94 let list: Vec<Value> = list_arc.iter().cloned().collect();
96 if !list.is_empty() {
97 self.current_list = Some(list);
98 return Ok(Some(self.emit_row()?));
99 }
100 }
101 self.current_row += 1;
102 }
103
104 self.current_chunk = None;
106 }
107 }
108
109 fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
111 let chunk = self.current_chunk.as_ref().unwrap();
112 let list = self.current_list.as_ref().unwrap();
113 let element = list[self.current_list_idx].clone();
114
115 let mut builder = DataChunkBuilder::new(&self.output_schema);
117
118 for col_idx in 0..chunk.column_count() {
120 if col_idx == self.list_col_idx {
121 continue; }
123 if let Some(col) = chunk.column(col_idx)
124 && let Some(value) = col.get_value(self.current_row)
125 && let Some(out_col) = builder.column_mut(col_idx)
126 {
127 out_col.push_value(value);
128 }
129 }
130
131 let new_col_idx = self.output_schema.len() - 1;
133 if let Some(out_col) = builder.column_mut(new_col_idx) {
134 out_col.push_value(element);
135 }
136
137 builder.advance_row();
138 self.current_list_idx += 1;
139
140 if self.current_list_idx >= list.len() {
142 self.current_row += 1;
143 }
144
145 Ok(builder.finish())
146 }
147}
148
149impl Operator for UnwindOperator {
150 fn next(&mut self) -> OperatorResult {
151 self.advance()
152 }
153
154 fn reset(&mut self) {
155 self.child.reset();
156 self.current_chunk = None;
157 self.current_row = 0;
158 self.current_list_idx = 0;
159 self.current_list = None;
160 }
161
162 fn name(&self) -> &'static str {
163 "Unwind"
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::execution::chunk::DataChunkBuilder;
171 use std::sync::Arc;
172
173 struct MockOperator {
174 chunks: Vec<DataChunk>,
175 position: usize,
176 }
177
178 impl Operator for MockOperator {
179 fn next(&mut self) -> OperatorResult {
180 if self.position < self.chunks.len() {
181 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
182 self.position += 1;
183 Ok(Some(chunk))
184 } else {
185 Ok(None)
186 }
187 }
188
189 fn reset(&mut self) {
190 self.position = 0;
191 }
192
193 fn name(&self) -> &'static str {
194 "MockOperator"
195 }
196 }
197
198 #[test]
199 fn test_unwind_basic() {
200 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); let list = Value::List(Arc::new([
203 Value::Int64(1),
204 Value::Int64(2),
205 Value::Int64(3),
206 ]));
207 builder.column_mut(0).unwrap().push_value(list);
208 builder.advance_row();
209 let chunk = builder.finish();
210
211 let mock = MockOperator {
212 chunks: vec![chunk],
213 position: 0,
214 };
215
216 let mut unwind = UnwindOperator::new(
218 Box::new(mock),
219 0,
220 "x".to_string(),
221 vec![LogicalType::Int64], );
223
224 let mut results = Vec::new();
226 while let Ok(Some(chunk)) = unwind.next() {
227 results.push(chunk);
228 }
229
230 assert_eq!(results.len(), 3);
231 }
232
233 #[test]
234 fn test_unwind_empty_list() {
235 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
237 let list = Value::List(Arc::new([]));
238 builder.column_mut(0).unwrap().push_value(list);
239 builder.advance_row();
240 let chunk = builder.finish();
241
242 let mock = MockOperator {
243 chunks: vec![chunk],
244 position: 0,
245 };
246
247 let mut unwind =
248 UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Int64]);
249
250 let mut results = Vec::new();
251 while let Ok(Some(chunk)) = unwind.next() {
252 results.push(chunk);
253 }
254
255 assert_eq!(results.len(), 0, "Empty list should produce no rows");
256 }
257
258 #[test]
259 fn test_unwind_empty_input() {
260 let mock = MockOperator {
262 chunks: vec![],
263 position: 0,
264 };
265
266 let mut unwind =
267 UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Int64]);
268
269 assert!(unwind.next().unwrap().is_none());
270 }
271
272 #[test]
273 fn test_unwind_multiple_rows() {
274 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
276
277 let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
278 builder.column_mut(0).unwrap().push_value(list1);
279 builder.advance_row();
280
281 let list2 = Value::List(Arc::new([Value::Int64(30)]));
282 builder.column_mut(0).unwrap().push_value(list2);
283 builder.advance_row();
284
285 let chunk = builder.finish();
286
287 let mock = MockOperator {
288 chunks: vec![chunk],
289 position: 0,
290 };
291
292 let mut unwind =
293 UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Int64]);
294
295 let mut count = 0;
296 while let Ok(Some(_chunk)) = unwind.next() {
297 count += 1;
298 }
299
300 assert_eq!(count, 3);
302 }
303
304 #[test]
305 fn test_unwind_single_element_list() {
306 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
307 let list = Value::List(Arc::new([Value::String("hello".into())]));
308 builder.column_mut(0).unwrap().push_value(list);
309 builder.advance_row();
310 let chunk = builder.finish();
311
312 let mock = MockOperator {
313 chunks: vec![chunk],
314 position: 0,
315 };
316
317 let mut unwind = UnwindOperator::new(
318 Box::new(mock),
319 0,
320 "item".to_string(),
321 vec![LogicalType::String],
322 );
323
324 let mut results = Vec::new();
325 while let Ok(Some(chunk)) = unwind.next() {
326 results.push(chunk);
327 }
328
329 assert_eq!(results.len(), 1);
330 }
331
332 #[test]
333 fn test_unwind_variable_name() {
334 let mock = MockOperator {
335 chunks: vec![],
336 position: 0,
337 };
338
339 let unwind = UnwindOperator::new(
340 Box::new(mock),
341 0,
342 "my_var".to_string(),
343 vec![LogicalType::Any],
344 );
345
346 assert_eq!(unwind.variable_name(), "my_var");
347 }
348
349 #[test]
350 fn test_unwind_name() {
351 let mock = MockOperator {
352 chunks: vec![],
353 position: 0,
354 };
355
356 let unwind =
357 UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Any]);
358
359 assert_eq!(unwind.name(), "Unwind");
360 }
361}