grafeo_core/execution/operators/
unwind.rs1use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7pub struct UnwindOperator {
12 child: Box<dyn Operator>,
14 list_col_idx: usize,
16 variable_name: String,
18 output_schema: Vec<LogicalType>,
20 emit_ordinality: bool,
22 emit_offset: bool,
24 current_chunk: Option<DataChunk>,
26 current_row: usize,
28 current_list_idx: usize,
30 current_list: Option<Vec<Value>>,
32}
33
34impl UnwindOperator {
35 pub fn new(
45 child: Box<dyn Operator>,
46 list_col_idx: usize,
47 variable_name: String,
48 output_schema: Vec<LogicalType>,
49 emit_ordinality: bool,
50 emit_offset: bool,
51 ) -> Self {
52 Self {
53 child,
54 list_col_idx,
55 variable_name,
56 output_schema,
57 emit_ordinality,
58 emit_offset,
59 current_chunk: None,
60 current_row: 0,
61 current_list_idx: 0,
62 current_list: None,
63 }
64 }
65
66 #[must_use]
68 pub fn variable_name(&self) -> &str {
69 &self.variable_name
70 }
71
72 fn advance(&mut self) -> OperatorResult {
74 loop {
75 if let Some(list) = &self.current_list
77 && self.current_list_idx < list.len()
78 {
79 return Ok(Some(self.emit_row()?));
81 }
82
83 self.current_list_idx = 0;
85 self.current_list = None;
86
87 if self.current_chunk.is_none() {
89 self.current_chunk = self.child.next()?;
90 self.current_row = 0;
91 if self.current_chunk.is_none() {
92 return Ok(None); }
94 }
95
96 let chunk = self.current_chunk.as_ref().unwrap();
97
98 while self.current_row < chunk.row_count() {
100 if let Some(col) = chunk.column(self.list_col_idx)
101 && let Some(value) = col.get_value(self.current_row)
102 {
103 let list = match value {
105 Value::List(list_arc) => list_arc.iter().cloned().collect::<Vec<Value>>(),
106 Value::Vector(vec_arc) => {
107 vec_arc.iter().map(|f| Value::Float64(*f as f64)).collect()
108 }
109 _ => {
110 self.current_row += 1;
111 continue;
112 }
113 };
114 if !list.is_empty() {
115 self.current_list = Some(list);
116 return Ok(Some(self.emit_row()?));
117 }
118 }
119 self.current_row += 1;
120 }
121
122 self.current_chunk = None;
124 }
125 }
126
127 fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
129 let chunk = self.current_chunk.as_ref().unwrap();
130 let list = self.current_list.as_ref().unwrap();
131 let element = list[self.current_list_idx].clone();
132
133 let mut builder = DataChunkBuilder::new(&self.output_schema);
135
136 for col_idx in 0..chunk.column_count() {
138 if col_idx == self.list_col_idx {
139 continue; }
141 if let Some(col) = chunk.column(col_idx)
142 && let Some(value) = col.get_value(self.current_row)
143 && let Some(out_col) = builder.column_mut(col_idx)
144 {
145 out_col.push_value(value);
146 }
147 }
148
149 let extra_cols = usize::from(self.emit_ordinality) + usize::from(self.emit_offset);
152 let element_col_idx = self.output_schema.len() - 1 - extra_cols;
153 if let Some(out_col) = builder.column_mut(element_col_idx) {
154 out_col.push_value(element);
155 }
156
157 let mut next_col = element_col_idx + 1;
159 if self.emit_ordinality {
160 if let Some(out_col) = builder.column_mut(next_col) {
161 out_col.push_value(Value::Int64((self.current_list_idx + 1) as i64));
162 }
163 next_col += 1;
164 }
165
166 if self.emit_offset
168 && let Some(out_col) = builder.column_mut(next_col)
169 {
170 out_col.push_value(Value::Int64(self.current_list_idx as i64));
171 }
172
173 builder.advance_row();
174 self.current_list_idx += 1;
175
176 if self.current_list_idx >= list.len() {
178 self.current_row += 1;
179 }
180
181 Ok(builder.finish())
182 }
183}
184
185impl Operator for UnwindOperator {
186 fn next(&mut self) -> OperatorResult {
187 self.advance()
188 }
189
190 fn reset(&mut self) {
191 self.child.reset();
192 self.current_chunk = None;
193 self.current_row = 0;
194 self.current_list_idx = 0;
195 self.current_list = None;
196 }
197
198 fn name(&self) -> &'static str {
199 "Unwind"
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::execution::chunk::DataChunkBuilder;
207 use std::sync::Arc;
208
209 struct MockOperator {
210 chunks: Vec<DataChunk>,
211 position: usize,
212 }
213
214 impl Operator for MockOperator {
215 fn next(&mut self) -> OperatorResult {
216 if self.position < self.chunks.len() {
217 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
218 self.position += 1;
219 Ok(Some(chunk))
220 } else {
221 Ok(None)
222 }
223 }
224
225 fn reset(&mut self) {
226 self.position = 0;
227 }
228
229 fn name(&self) -> &'static str {
230 "MockOperator"
231 }
232 }
233
234 #[test]
235 fn test_unwind_basic() {
236 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); let list = Value::List(Arc::new([
239 Value::Int64(1),
240 Value::Int64(2),
241 Value::Int64(3),
242 ]));
243 builder.column_mut(0).unwrap().push_value(list);
244 builder.advance_row();
245 let chunk = builder.finish();
246
247 let mock = MockOperator {
248 chunks: vec![chunk],
249 position: 0,
250 };
251
252 let mut unwind = UnwindOperator::new(
254 Box::new(mock),
255 0,
256 "x".to_string(),
257 vec![LogicalType::Int64], false,
259 false,
260 );
261
262 let mut results = Vec::new();
264 while let Ok(Some(chunk)) = unwind.next() {
265 results.push(chunk);
266 }
267
268 assert_eq!(results.len(), 3);
269 }
270
271 #[test]
272 fn test_unwind_empty_list() {
273 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
275 let list = Value::List(Arc::new([]));
276 builder.column_mut(0).unwrap().push_value(list);
277 builder.advance_row();
278 let chunk = builder.finish();
279
280 let mock = MockOperator {
281 chunks: vec![chunk],
282 position: 0,
283 };
284
285 let mut unwind = UnwindOperator::new(
286 Box::new(mock),
287 0,
288 "x".to_string(),
289 vec![LogicalType::Int64],
290 false,
291 false,
292 );
293
294 let mut results = Vec::new();
295 while let Ok(Some(chunk)) = unwind.next() {
296 results.push(chunk);
297 }
298
299 assert_eq!(results.len(), 0, "Empty list should produce no rows");
300 }
301
302 #[test]
303 fn test_unwind_empty_input() {
304 let mock = MockOperator {
306 chunks: vec![],
307 position: 0,
308 };
309
310 let mut unwind = UnwindOperator::new(
311 Box::new(mock),
312 0,
313 "x".to_string(),
314 vec![LogicalType::Int64],
315 false,
316 false,
317 );
318
319 assert!(unwind.next().unwrap().is_none());
320 }
321
322 #[test]
323 fn test_unwind_multiple_rows() {
324 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
326
327 let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
328 builder.column_mut(0).unwrap().push_value(list1);
329 builder.advance_row();
330
331 let list2 = Value::List(Arc::new([Value::Int64(30)]));
332 builder.column_mut(0).unwrap().push_value(list2);
333 builder.advance_row();
334
335 let chunk = builder.finish();
336
337 let mock = MockOperator {
338 chunks: vec![chunk],
339 position: 0,
340 };
341
342 let mut unwind = UnwindOperator::new(
343 Box::new(mock),
344 0,
345 "x".to_string(),
346 vec![LogicalType::Int64],
347 false,
348 false,
349 );
350
351 let mut count = 0;
352 while let Ok(Some(_chunk)) = unwind.next() {
353 count += 1;
354 }
355
356 assert_eq!(count, 3);
358 }
359
360 #[test]
361 fn test_unwind_single_element_list() {
362 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
363 let list = Value::List(Arc::new([Value::String("hello".into())]));
364 builder.column_mut(0).unwrap().push_value(list);
365 builder.advance_row();
366 let chunk = builder.finish();
367
368 let mock = MockOperator {
369 chunks: vec![chunk],
370 position: 0,
371 };
372
373 let mut unwind = UnwindOperator::new(
374 Box::new(mock),
375 0,
376 "item".to_string(),
377 vec![LogicalType::String],
378 false,
379 false,
380 );
381
382 let mut results = Vec::new();
383 while let Ok(Some(chunk)) = unwind.next() {
384 results.push(chunk);
385 }
386
387 assert_eq!(results.len(), 1);
388 }
389
390 #[test]
391 fn test_unwind_variable_name() {
392 let mock = MockOperator {
393 chunks: vec![],
394 position: 0,
395 };
396
397 let unwind = UnwindOperator::new(
398 Box::new(mock),
399 0,
400 "my_var".to_string(),
401 vec![LogicalType::Any],
402 false,
403 false,
404 );
405
406 assert_eq!(unwind.variable_name(), "my_var");
407 }
408
409 #[test]
410 fn test_unwind_name() {
411 let mock = MockOperator {
412 chunks: vec![],
413 position: 0,
414 };
415
416 let unwind = UnwindOperator::new(
417 Box::new(mock),
418 0,
419 "x".to_string(),
420 vec![LogicalType::Any],
421 false,
422 false,
423 );
424
425 assert_eq!(unwind.name(), "Unwind");
426 }
427
428 #[test]
429 fn test_unwind_with_ordinality() {
430 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
432 let list = Value::List(Arc::new([
433 Value::Int64(10),
434 Value::Int64(20),
435 Value::Int64(30),
436 ]));
437 builder.column_mut(0).unwrap().push_value(list);
438 builder.advance_row();
439 let chunk = builder.finish();
440
441 let mock = MockOperator {
442 chunks: vec![chunk],
443 position: 0,
444 };
445
446 let mut unwind = UnwindOperator::new(
448 Box::new(mock),
449 0,
450 "x".to_string(),
451 vec![LogicalType::Any, LogicalType::Int64],
452 true, false, );
455
456 let mut ordinalities = Vec::new();
457 while let Ok(Some(chunk)) = unwind.next() {
458 if let Some(col) = chunk.column(1)
459 && let Some(Value::Int64(v)) = col.get_value(0)
460 {
461 ordinalities.push(v);
462 }
463 }
464
465 assert_eq!(ordinalities, vec![1, 2, 3]);
467 }
468
469 #[test]
470 fn test_unwind_with_offset() {
471 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
472 let list = Value::List(Arc::new([
473 Value::Int64(10),
474 Value::Int64(20),
475 Value::Int64(30),
476 ]));
477 builder.column_mut(0).unwrap().push_value(list);
478 builder.advance_row();
479 let chunk = builder.finish();
480
481 let mock = MockOperator {
482 chunks: vec![chunk],
483 position: 0,
484 };
485
486 let mut unwind = UnwindOperator::new(
488 Box::new(mock),
489 0,
490 "x".to_string(),
491 vec![LogicalType::Any, LogicalType::Int64],
492 false, true, );
495
496 let mut offsets = Vec::new();
497 while let Ok(Some(chunk)) = unwind.next() {
498 if let Some(col) = chunk.column(1)
499 && let Some(Value::Int64(v)) = col.get_value(0)
500 {
501 offsets.push(v);
502 }
503 }
504
505 assert_eq!(offsets, vec![0, 1, 2]);
507 }
508}