grafeo_core/execution/operators/
unwind.rs1use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7pub struct UnwindOperator {
12 child: Box<dyn Operator>,
14 list_col_idx: usize,
16 variable_name: String,
18 output_schema: Vec<LogicalType>,
20 emit_ordinality: bool,
22 emit_offset: bool,
24 current_chunk: Option<DataChunk>,
26 current_row: usize,
28 current_list_idx: usize,
30 current_list: Option<Vec<Value>>,
32}
33
34impl UnwindOperator {
35 pub fn new(
45 child: Box<dyn Operator>,
46 list_col_idx: usize,
47 variable_name: String,
48 output_schema: Vec<LogicalType>,
49 emit_ordinality: bool,
50 emit_offset: bool,
51 ) -> Self {
52 Self {
53 child,
54 list_col_idx,
55 variable_name,
56 output_schema,
57 emit_ordinality,
58 emit_offset,
59 current_chunk: None,
60 current_row: 0,
61 current_list_idx: 0,
62 current_list: None,
63 }
64 }
65
66 #[must_use]
68 pub fn variable_name(&self) -> &str {
69 &self.variable_name
70 }
71
72 fn advance(&mut self) -> OperatorResult {
74 loop {
75 if let Some(list) = &self.current_list
77 && self.current_list_idx < list.len()
78 {
79 return Ok(Some(self.emit_row()?));
81 }
82
83 self.current_list_idx = 0;
85 self.current_list = None;
86
87 if self.current_chunk.is_none() {
89 self.current_chunk = self.child.next()?;
90 self.current_row = 0;
91 if self.current_chunk.is_none() {
92 return Ok(None); }
94 }
95
96 let chunk = self
97 .current_chunk
98 .as_ref()
99 .expect("current_chunk is Some: checked above");
100
101 while self.current_row < chunk.row_count() {
103 if let Some(col) = chunk.column(self.list_col_idx)
104 && let Some(value) = col.get_value(self.current_row)
105 {
106 let list = match value {
108 Value::List(list_arc) => list_arc.iter().cloned().collect::<Vec<Value>>(),
109 Value::Vector(vec_arc) => {
110 vec_arc.iter().map(|f| Value::Float64(*f as f64)).collect()
111 }
112 _ => {
113 self.current_row += 1;
114 continue;
115 }
116 };
117 if !list.is_empty() {
118 self.current_list = Some(list);
119 return Ok(Some(self.emit_row()?));
120 }
121 }
122 self.current_row += 1;
123 }
124
125 self.current_chunk = None;
127 }
128 }
129
130 fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
132 let chunk = self
133 .current_chunk
134 .as_ref()
135 .expect("current_chunk is Some: set before emit_row call");
136 let list = self
137 .current_list
138 .as_ref()
139 .expect("current_list is Some: set before emit_row call");
140 let element = list[self.current_list_idx].clone();
141
142 let mut builder = DataChunkBuilder::new(&self.output_schema);
144
145 for col_idx in 0..chunk.column_count() {
147 if col_idx == self.list_col_idx {
148 continue; }
150 if let Some(col) = chunk.column(col_idx)
151 && let Some(value) = col.get_value(self.current_row)
152 && let Some(out_col) = builder.column_mut(col_idx)
153 {
154 out_col.push_value(value);
155 }
156 }
157
158 let extra_cols = usize::from(self.emit_ordinality) + usize::from(self.emit_offset);
161 let element_col_idx = self.output_schema.len() - 1 - extra_cols;
162 if let Some(out_col) = builder.column_mut(element_col_idx) {
163 out_col.push_value(element);
164 }
165
166 let mut next_col = element_col_idx + 1;
168 if self.emit_ordinality {
169 if let Some(out_col) = builder.column_mut(next_col) {
170 #[allow(clippy::cast_possible_wrap)]
172 out_col.push_value(Value::Int64((self.current_list_idx + 1) as i64));
173 }
174 next_col += 1;
175 }
176
177 if self.emit_offset
179 && let Some(out_col) = builder.column_mut(next_col)
180 {
181 #[allow(clippy::cast_possible_wrap)]
183 out_col.push_value(Value::Int64(self.current_list_idx as i64));
184 }
185
186 builder.advance_row();
187 self.current_list_idx += 1;
188
189 if self.current_list_idx >= list.len() {
191 self.current_row += 1;
192 }
193
194 Ok(builder.finish())
195 }
196}
197
198impl Operator for UnwindOperator {
199 fn next(&mut self) -> OperatorResult {
200 self.advance()
201 }
202
203 fn reset(&mut self) {
204 self.child.reset();
205 self.current_chunk = None;
206 self.current_row = 0;
207 self.current_list_idx = 0;
208 self.current_list = None;
209 }
210
211 fn name(&self) -> &'static str {
212 "Unwind"
213 }
214
215 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
216 self
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::execution::chunk::DataChunkBuilder;
224 use std::sync::Arc;
225
226 struct MockOperator {
227 chunks: Vec<DataChunk>,
228 position: usize,
229 }
230
231 impl Operator for MockOperator {
232 fn next(&mut self) -> OperatorResult {
233 if self.position < self.chunks.len() {
234 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
235 self.position += 1;
236 Ok(Some(chunk))
237 } else {
238 Ok(None)
239 }
240 }
241
242 fn reset(&mut self) {
243 self.position = 0;
244 }
245
246 fn name(&self) -> &'static str {
247 "MockOperator"
248 }
249
250 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
251 self
252 }
253 }
254
255 #[test]
256 fn test_unwind_basic() {
257 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); let list = Value::List(Arc::new([
260 Value::Int64(1),
261 Value::Int64(2),
262 Value::Int64(3),
263 ]));
264 builder.column_mut(0).unwrap().push_value(list);
265 builder.advance_row();
266 let chunk = builder.finish();
267
268 let mock = MockOperator {
269 chunks: vec![chunk],
270 position: 0,
271 };
272
273 let mut unwind = UnwindOperator::new(
275 Box::new(mock),
276 0,
277 "x".to_string(),
278 vec![LogicalType::Int64], false,
280 false,
281 );
282
283 let mut results = Vec::new();
285 while let Ok(Some(chunk)) = unwind.next() {
286 results.push(chunk);
287 }
288
289 assert_eq!(results.len(), 3);
290 }
291
292 #[test]
293 fn test_unwind_empty_list() {
294 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
296 let list = Value::List(Arc::new([]));
297 builder.column_mut(0).unwrap().push_value(list);
298 builder.advance_row();
299 let chunk = builder.finish();
300
301 let mock = MockOperator {
302 chunks: vec![chunk],
303 position: 0,
304 };
305
306 let mut unwind = UnwindOperator::new(
307 Box::new(mock),
308 0,
309 "x".to_string(),
310 vec![LogicalType::Int64],
311 false,
312 false,
313 );
314
315 let mut results = Vec::new();
316 while let Ok(Some(chunk)) = unwind.next() {
317 results.push(chunk);
318 }
319
320 assert_eq!(results.len(), 0, "Empty list should produce no rows");
321 }
322
323 #[test]
324 fn test_unwind_empty_input() {
325 let mock = MockOperator {
327 chunks: vec![],
328 position: 0,
329 };
330
331 let mut unwind = UnwindOperator::new(
332 Box::new(mock),
333 0,
334 "x".to_string(),
335 vec![LogicalType::Int64],
336 false,
337 false,
338 );
339
340 assert!(unwind.next().unwrap().is_none());
341 }
342
343 #[test]
344 fn test_unwind_multiple_rows() {
345 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
347
348 let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
349 builder.column_mut(0).unwrap().push_value(list1);
350 builder.advance_row();
351
352 let list2 = Value::List(Arc::new([Value::Int64(30)]));
353 builder.column_mut(0).unwrap().push_value(list2);
354 builder.advance_row();
355
356 let chunk = builder.finish();
357
358 let mock = MockOperator {
359 chunks: vec![chunk],
360 position: 0,
361 };
362
363 let mut unwind = UnwindOperator::new(
364 Box::new(mock),
365 0,
366 "x".to_string(),
367 vec![LogicalType::Int64],
368 false,
369 false,
370 );
371
372 let mut count = 0;
373 while let Ok(Some(_chunk)) = unwind.next() {
374 count += 1;
375 }
376
377 assert_eq!(count, 3);
379 }
380
381 #[test]
382 fn test_unwind_single_element_list() {
383 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
384 let list = Value::List(Arc::new([Value::String("hello".into())]));
385 builder.column_mut(0).unwrap().push_value(list);
386 builder.advance_row();
387 let chunk = builder.finish();
388
389 let mock = MockOperator {
390 chunks: vec![chunk],
391 position: 0,
392 };
393
394 let mut unwind = UnwindOperator::new(
395 Box::new(mock),
396 0,
397 "item".to_string(),
398 vec![LogicalType::String],
399 false,
400 false,
401 );
402
403 let mut results = Vec::new();
404 while let Ok(Some(chunk)) = unwind.next() {
405 results.push(chunk);
406 }
407
408 assert_eq!(results.len(), 1);
409 }
410
411 #[test]
412 fn test_unwind_variable_name() {
413 let mock = MockOperator {
414 chunks: vec![],
415 position: 0,
416 };
417
418 let unwind = UnwindOperator::new(
419 Box::new(mock),
420 0,
421 "my_var".to_string(),
422 vec![LogicalType::Any],
423 false,
424 false,
425 );
426
427 assert_eq!(unwind.variable_name(), "my_var");
428 }
429
430 #[test]
431 fn test_unwind_name() {
432 let mock = MockOperator {
433 chunks: vec![],
434 position: 0,
435 };
436
437 let unwind = UnwindOperator::new(
438 Box::new(mock),
439 0,
440 "x".to_string(),
441 vec![LogicalType::Any],
442 false,
443 false,
444 );
445
446 assert_eq!(unwind.name(), "Unwind");
447 }
448
449 #[test]
450 fn test_unwind_with_ordinality() {
451 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
453 let list = Value::List(Arc::new([
454 Value::Int64(10),
455 Value::Int64(20),
456 Value::Int64(30),
457 ]));
458 builder.column_mut(0).unwrap().push_value(list);
459 builder.advance_row();
460 let chunk = builder.finish();
461
462 let mock = MockOperator {
463 chunks: vec![chunk],
464 position: 0,
465 };
466
467 let mut unwind = UnwindOperator::new(
469 Box::new(mock),
470 0,
471 "x".to_string(),
472 vec![LogicalType::Any, LogicalType::Int64],
473 true, false, );
476
477 let mut ordinalities = Vec::new();
478 while let Ok(Some(chunk)) = unwind.next() {
479 if let Some(col) = chunk.column(1)
480 && let Some(Value::Int64(v)) = col.get_value(0)
481 {
482 ordinalities.push(v);
483 }
484 }
485
486 assert_eq!(ordinalities, vec![1, 2, 3]);
488 }
489
490 #[test]
491 fn test_unwind_with_offset() {
492 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
493 let list = Value::List(Arc::new([
494 Value::Int64(10),
495 Value::Int64(20),
496 Value::Int64(30),
497 ]));
498 builder.column_mut(0).unwrap().push_value(list);
499 builder.advance_row();
500 let chunk = builder.finish();
501
502 let mock = MockOperator {
503 chunks: vec![chunk],
504 position: 0,
505 };
506
507 let mut unwind = UnwindOperator::new(
509 Box::new(mock),
510 0,
511 "x".to_string(),
512 vec![LogicalType::Any, LogicalType::Int64],
513 false, true, );
516
517 let mut offsets = Vec::new();
518 while let Ok(Some(chunk)) = unwind.next() {
519 if let Some(col) = chunk.column(1)
520 && let Some(Value::Int64(v)) = col.get_value(0)
521 {
522 offsets.push(v);
523 }
524 }
525
526 assert_eq!(offsets, vec![0, 1, 2]);
528 }
529
530 #[test]
531 fn test_unwind_into_any() {
532 let mock = MockOperator {
533 chunks: vec![],
534 position: 0,
535 };
536 let op = UnwindOperator::new(
537 Box::new(mock),
538 0,
539 "items".to_string(),
540 vec![LogicalType::Any, LogicalType::Any],
541 false,
542 false,
543 );
544 let any = Box::new(op).into_any();
545 assert!(any.downcast::<UnwindOperator>().is_ok());
546 }
547}