grafeo_core/execution/operators/
unwind.rs1use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7pub struct UnwindOperator {
12 child: Box<dyn Operator>,
14 list_col_idx: usize,
16 variable_name: String,
18 output_schema: Vec<LogicalType>,
20 emit_ordinality: bool,
22 emit_offset: bool,
24 current_chunk: Option<DataChunk>,
26 current_row: usize,
28 current_list_idx: usize,
30 current_list: Option<Vec<Value>>,
32}
33
34impl UnwindOperator {
35 pub fn new(
45 child: Box<dyn Operator>,
46 list_col_idx: usize,
47 variable_name: String,
48 output_schema: Vec<LogicalType>,
49 emit_ordinality: bool,
50 emit_offset: bool,
51 ) -> Self {
52 Self {
53 child,
54 list_col_idx,
55 variable_name,
56 output_schema,
57 emit_ordinality,
58 emit_offset,
59 current_chunk: None,
60 current_row: 0,
61 current_list_idx: 0,
62 current_list: None,
63 }
64 }
65
66 #[must_use]
68 pub fn variable_name(&self) -> &str {
69 &self.variable_name
70 }
71
72 fn advance(&mut self) -> OperatorResult {
74 loop {
75 if let Some(list) = &self.current_list
77 && self.current_list_idx < list.len()
78 {
79 return Ok(Some(self.emit_row()?));
81 }
82
83 self.current_list_idx = 0;
85 self.current_list = None;
86
87 if self.current_chunk.is_none() {
89 self.current_chunk = self.child.next()?;
90 self.current_row = 0;
91 if self.current_chunk.is_none() {
92 return Ok(None); }
94 }
95
96 let chunk = self
97 .current_chunk
98 .as_ref()
99 .expect("current_chunk is Some: checked above");
100
101 while self.current_row < chunk.row_count() {
103 if let Some(col) = chunk.column(self.list_col_idx)
104 && let Some(value) = col.get_value(self.current_row)
105 {
106 let list = match value {
108 Value::List(list_arc) => list_arc.iter().cloned().collect::<Vec<Value>>(),
109 Value::Vector(vec_arc) => {
110 vec_arc.iter().map(|f| Value::Float64(*f as f64)).collect()
111 }
112 _ => {
113 self.current_row += 1;
114 continue;
115 }
116 };
117 if !list.is_empty() {
118 self.current_list = Some(list);
119 return Ok(Some(self.emit_row()?));
120 }
121 }
122 self.current_row += 1;
123 }
124
125 self.current_chunk = None;
127 }
128 }
129
130 fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
132 let chunk = self
133 .current_chunk
134 .as_ref()
135 .expect("current_chunk is Some: set before emit_row call");
136 let list = self
137 .current_list
138 .as_ref()
139 .expect("current_list is Some: set before emit_row call");
140 let element = list[self.current_list_idx].clone();
141
142 let mut builder = DataChunkBuilder::new(&self.output_schema);
144
145 for col_idx in 0..chunk.column_count() {
147 if col_idx == self.list_col_idx {
148 continue; }
150 if let Some(col) = chunk.column(col_idx)
151 && let Some(value) = col.get_value(self.current_row)
152 && let Some(out_col) = builder.column_mut(col_idx)
153 {
154 out_col.push_value(value);
155 }
156 }
157
158 let extra_cols = usize::from(self.emit_ordinality) + usize::from(self.emit_offset);
161 let element_col_idx = self.output_schema.len() - 1 - extra_cols;
162 if let Some(out_col) = builder.column_mut(element_col_idx) {
163 out_col.push_value(element);
164 }
165
166 let mut next_col = element_col_idx + 1;
168 if self.emit_ordinality {
169 if let Some(out_col) = builder.column_mut(next_col) {
170 out_col.push_value(Value::Int64((self.current_list_idx + 1) as i64));
171 }
172 next_col += 1;
173 }
174
175 if self.emit_offset
177 && let Some(out_col) = builder.column_mut(next_col)
178 {
179 out_col.push_value(Value::Int64(self.current_list_idx as i64));
180 }
181
182 builder.advance_row();
183 self.current_list_idx += 1;
184
185 if self.current_list_idx >= list.len() {
187 self.current_row += 1;
188 }
189
190 Ok(builder.finish())
191 }
192}
193
194impl Operator for UnwindOperator {
195 fn next(&mut self) -> OperatorResult {
196 self.advance()
197 }
198
199 fn reset(&mut self) {
200 self.child.reset();
201 self.current_chunk = None;
202 self.current_row = 0;
203 self.current_list_idx = 0;
204 self.current_list = None;
205 }
206
207 fn name(&self) -> &'static str {
208 "Unwind"
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::execution::chunk::DataChunkBuilder;
216 use std::sync::Arc;
217
218 struct MockOperator {
219 chunks: Vec<DataChunk>,
220 position: usize,
221 }
222
223 impl Operator for MockOperator {
224 fn next(&mut self) -> OperatorResult {
225 if self.position < self.chunks.len() {
226 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
227 self.position += 1;
228 Ok(Some(chunk))
229 } else {
230 Ok(None)
231 }
232 }
233
234 fn reset(&mut self) {
235 self.position = 0;
236 }
237
238 fn name(&self) -> &'static str {
239 "MockOperator"
240 }
241 }
242
243 #[test]
244 fn test_unwind_basic() {
245 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); let list = Value::List(Arc::new([
248 Value::Int64(1),
249 Value::Int64(2),
250 Value::Int64(3),
251 ]));
252 builder.column_mut(0).unwrap().push_value(list);
253 builder.advance_row();
254 let chunk = builder.finish();
255
256 let mock = MockOperator {
257 chunks: vec![chunk],
258 position: 0,
259 };
260
261 let mut unwind = UnwindOperator::new(
263 Box::new(mock),
264 0,
265 "x".to_string(),
266 vec![LogicalType::Int64], false,
268 false,
269 );
270
271 let mut results = Vec::new();
273 while let Ok(Some(chunk)) = unwind.next() {
274 results.push(chunk);
275 }
276
277 assert_eq!(results.len(), 3);
278 }
279
280 #[test]
281 fn test_unwind_empty_list() {
282 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
284 let list = Value::List(Arc::new([]));
285 builder.column_mut(0).unwrap().push_value(list);
286 builder.advance_row();
287 let chunk = builder.finish();
288
289 let mock = MockOperator {
290 chunks: vec![chunk],
291 position: 0,
292 };
293
294 let mut unwind = UnwindOperator::new(
295 Box::new(mock),
296 0,
297 "x".to_string(),
298 vec![LogicalType::Int64],
299 false,
300 false,
301 );
302
303 let mut results = Vec::new();
304 while let Ok(Some(chunk)) = unwind.next() {
305 results.push(chunk);
306 }
307
308 assert_eq!(results.len(), 0, "Empty list should produce no rows");
309 }
310
311 #[test]
312 fn test_unwind_empty_input() {
313 let mock = MockOperator {
315 chunks: vec![],
316 position: 0,
317 };
318
319 let mut unwind = UnwindOperator::new(
320 Box::new(mock),
321 0,
322 "x".to_string(),
323 vec![LogicalType::Int64],
324 false,
325 false,
326 );
327
328 assert!(unwind.next().unwrap().is_none());
329 }
330
331 #[test]
332 fn test_unwind_multiple_rows() {
333 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
335
336 let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
337 builder.column_mut(0).unwrap().push_value(list1);
338 builder.advance_row();
339
340 let list2 = Value::List(Arc::new([Value::Int64(30)]));
341 builder.column_mut(0).unwrap().push_value(list2);
342 builder.advance_row();
343
344 let chunk = builder.finish();
345
346 let mock = MockOperator {
347 chunks: vec![chunk],
348 position: 0,
349 };
350
351 let mut unwind = UnwindOperator::new(
352 Box::new(mock),
353 0,
354 "x".to_string(),
355 vec![LogicalType::Int64],
356 false,
357 false,
358 );
359
360 let mut count = 0;
361 while let Ok(Some(_chunk)) = unwind.next() {
362 count += 1;
363 }
364
365 assert_eq!(count, 3);
367 }
368
369 #[test]
370 fn test_unwind_single_element_list() {
371 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
372 let list = Value::List(Arc::new([Value::String("hello".into())]));
373 builder.column_mut(0).unwrap().push_value(list);
374 builder.advance_row();
375 let chunk = builder.finish();
376
377 let mock = MockOperator {
378 chunks: vec![chunk],
379 position: 0,
380 };
381
382 let mut unwind = UnwindOperator::new(
383 Box::new(mock),
384 0,
385 "item".to_string(),
386 vec![LogicalType::String],
387 false,
388 false,
389 );
390
391 let mut results = Vec::new();
392 while let Ok(Some(chunk)) = unwind.next() {
393 results.push(chunk);
394 }
395
396 assert_eq!(results.len(), 1);
397 }
398
399 #[test]
400 fn test_unwind_variable_name() {
401 let mock = MockOperator {
402 chunks: vec![],
403 position: 0,
404 };
405
406 let unwind = UnwindOperator::new(
407 Box::new(mock),
408 0,
409 "my_var".to_string(),
410 vec![LogicalType::Any],
411 false,
412 false,
413 );
414
415 assert_eq!(unwind.variable_name(), "my_var");
416 }
417
418 #[test]
419 fn test_unwind_name() {
420 let mock = MockOperator {
421 chunks: vec![],
422 position: 0,
423 };
424
425 let unwind = UnwindOperator::new(
426 Box::new(mock),
427 0,
428 "x".to_string(),
429 vec![LogicalType::Any],
430 false,
431 false,
432 );
433
434 assert_eq!(unwind.name(), "Unwind");
435 }
436
437 #[test]
438 fn test_unwind_with_ordinality() {
439 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
441 let list = Value::List(Arc::new([
442 Value::Int64(10),
443 Value::Int64(20),
444 Value::Int64(30),
445 ]));
446 builder.column_mut(0).unwrap().push_value(list);
447 builder.advance_row();
448 let chunk = builder.finish();
449
450 let mock = MockOperator {
451 chunks: vec![chunk],
452 position: 0,
453 };
454
455 let mut unwind = UnwindOperator::new(
457 Box::new(mock),
458 0,
459 "x".to_string(),
460 vec![LogicalType::Any, LogicalType::Int64],
461 true, false, );
464
465 let mut ordinalities = Vec::new();
466 while let Ok(Some(chunk)) = unwind.next() {
467 if let Some(col) = chunk.column(1)
468 && let Some(Value::Int64(v)) = col.get_value(0)
469 {
470 ordinalities.push(v);
471 }
472 }
473
474 assert_eq!(ordinalities, vec![1, 2, 3]);
476 }
477
478 #[test]
479 fn test_unwind_with_offset() {
480 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
481 let list = Value::List(Arc::new([
482 Value::Int64(10),
483 Value::Int64(20),
484 Value::Int64(30),
485 ]));
486 builder.column_mut(0).unwrap().push_value(list);
487 builder.advance_row();
488 let chunk = builder.finish();
489
490 let mock = MockOperator {
491 chunks: vec![chunk],
492 position: 0,
493 };
494
495 let mut unwind = UnwindOperator::new(
497 Box::new(mock),
498 0,
499 "x".to_string(),
500 vec![LogicalType::Any, LogicalType::Int64],
501 false, true, );
504
505 let mut offsets = Vec::new();
506 while let Ok(Some(chunk)) = unwind.next() {
507 if let Some(col) = chunk.column(1)
508 && let Some(Value::Int64(v)) = col.get_value(0)
509 {
510 offsets.push(v);
511 }
512 }
513
514 assert_eq!(offsets, vec![0, 1, 2]);
516 }
517}