Skip to main content

grafeo_core/execution/operators/
unwind.rs

1//! Unwind operator for expanding lists into individual rows.
2
3use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7/// Unwind operator that expands a list column into individual rows.
8///
9/// For each input row, if the list column contains N elements, this operator
10/// produces N output rows, each with one element from the list.
11pub struct UnwindOperator {
12    /// Child operator to read from.
13    child: Box<dyn Operator>,
14    /// Index of the column containing the list to unwind.
15    list_col_idx: usize,
16    /// Name of the new variable for the unwound elements.
17    variable_name: String,
18    /// Schema of output columns (inherited from input plus the new column).
19    output_schema: Vec<LogicalType>,
20    /// Whether to emit a 1-based ORDINALITY column.
21    emit_ordinality: bool,
22    /// Whether to emit a 0-based OFFSET column.
23    emit_offset: bool,
24    /// Current input chunk being processed.
25    current_chunk: Option<DataChunk>,
26    /// Current row index within the chunk.
27    current_row: usize,
28    /// Current index within the list being unwound.
29    current_list_idx: usize,
30    /// Current list being unwound.
31    current_list: Option<Vec<Value>>,
32}
33
34impl UnwindOperator {
35    /// Creates a new unwind operator.
36    ///
37    /// # Arguments
38    /// * `child` - The input operator
39    /// * `list_col_idx` - The column index containing the list to unwind
40    /// * `variable_name` - The name of the new variable
41    /// * `output_schema` - The schema for output (should include the new column type)
42    /// * `emit_ordinality` - Whether to emit a 1-based index column
43    /// * `emit_offset` - Whether to emit a 0-based index column
44    pub fn new(
45        child: Box<dyn Operator>,
46        list_col_idx: usize,
47        variable_name: String,
48        output_schema: Vec<LogicalType>,
49        emit_ordinality: bool,
50        emit_offset: bool,
51    ) -> Self {
52        Self {
53            child,
54            list_col_idx,
55            variable_name,
56            output_schema,
57            emit_ordinality,
58            emit_offset,
59            current_chunk: None,
60            current_row: 0,
61            current_list_idx: 0,
62            current_list: None,
63        }
64    }
65
66    /// Returns the variable name for the unwound elements.
67    #[must_use]
68    pub fn variable_name(&self) -> &str {
69        &self.variable_name
70    }
71
72    /// Advances to the next row or fetches the next chunk.
73    fn advance(&mut self) -> OperatorResult {
74        loop {
75            // If we have a current list, try to get the next element
76            if let Some(list) = &self.current_list
77                && self.current_list_idx < list.len()
78            {
79                // Still have elements in the current list
80                return Ok(Some(self.emit_row()?));
81            }
82
83            // Need to move to the next row
84            self.current_list_idx = 0;
85            self.current_list = None;
86
87            // Get the next chunk if needed
88            if self.current_chunk.is_none() {
89                self.current_chunk = self.child.next()?;
90                self.current_row = 0;
91                if self.current_chunk.is_none() {
92                    return Ok(None); // No more data
93                }
94            }
95
96            let chunk = self
97                .current_chunk
98                .as_ref()
99                .expect("current_chunk is Some: checked above");
100
101            // Find the next row with a list value
102            while self.current_row < chunk.row_count() {
103                if let Some(col) = chunk.column(self.list_col_idx)
104                    && let Some(value) = col.get_value(self.current_row)
105                {
106                    // Extract list elements from either Value::List or Value::Vector
107                    let list = match value {
108                        Value::List(list_arc) => list_arc.iter().cloned().collect::<Vec<Value>>(),
109                        Value::Vector(vec_arc) => {
110                            vec_arc.iter().map(|f| Value::Float64(*f as f64)).collect()
111                        }
112                        _ => {
113                            self.current_row += 1;
114                            continue;
115                        }
116                    };
117                    if !list.is_empty() {
118                        self.current_list = Some(list);
119                        return Ok(Some(self.emit_row()?));
120                    }
121                }
122                self.current_row += 1;
123            }
124
125            // Exhausted current chunk, get next one
126            self.current_chunk = None;
127        }
128    }
129
130    /// Emits a single row with the current list element.
131    fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
132        let chunk = self
133            .current_chunk
134            .as_ref()
135            .expect("current_chunk is Some: set before emit_row call");
136        let list = self
137            .current_list
138            .as_ref()
139            .expect("current_list is Some: set before emit_row call");
140        let element = list[self.current_list_idx].clone();
141
142        // Build output row: copy all columns from input + add the unwound element
143        let mut builder = DataChunkBuilder::new(&self.output_schema);
144
145        // Copy existing columns (except the list column which we're replacing)
146        for col_idx in 0..chunk.column_count() {
147            if col_idx == self.list_col_idx {
148                continue; // Skip the list column
149            }
150            if let Some(col) = chunk.column(col_idx)
151                && let Some(value) = col.get_value(self.current_row)
152                && let Some(out_col) = builder.column_mut(col_idx)
153            {
154                out_col.push_value(value);
155            }
156        }
157
158        // Add the unwound element column.
159        // It's at the end of the output schema, minus any ordinality/offset columns.
160        let extra_cols = usize::from(self.emit_ordinality) + usize::from(self.emit_offset);
161        let element_col_idx = self.output_schema.len() - 1 - extra_cols;
162        if let Some(out_col) = builder.column_mut(element_col_idx) {
163            out_col.push_value(element);
164        }
165
166        // Add ORDINALITY (1-based) if requested
167        let mut next_col = element_col_idx + 1;
168        if self.emit_ordinality {
169            if let Some(out_col) = builder.column_mut(next_col) {
170                // reason: list index fits i64 for practical sizes
171                #[allow(clippy::cast_possible_wrap)]
172                out_col.push_value(Value::Int64((self.current_list_idx + 1) as i64));
173            }
174            next_col += 1;
175        }
176
177        // Add OFFSET (0-based) if requested
178        if self.emit_offset
179            && let Some(out_col) = builder.column_mut(next_col)
180        {
181            // reason: list index fits i64 for practical sizes
182            #[allow(clippy::cast_possible_wrap)]
183            out_col.push_value(Value::Int64(self.current_list_idx as i64));
184        }
185
186        builder.advance_row();
187        self.current_list_idx += 1;
188
189        // If we've exhausted this list, move to the next row
190        if self.current_list_idx >= list.len() {
191            self.current_row += 1;
192        }
193
194        Ok(builder.finish())
195    }
196}
197
198impl Operator for UnwindOperator {
199    fn next(&mut self) -> OperatorResult {
200        self.advance()
201    }
202
203    fn reset(&mut self) {
204        self.child.reset();
205        self.current_chunk = None;
206        self.current_row = 0;
207        self.current_list_idx = 0;
208        self.current_list = None;
209    }
210
211    fn name(&self) -> &'static str {
212        "Unwind"
213    }
214
215    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
216        self
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::execution::chunk::DataChunkBuilder;
224    use std::sync::Arc;
225
226    struct MockOperator {
227        chunks: Vec<DataChunk>,
228        position: usize,
229    }
230
231    impl Operator for MockOperator {
232        fn next(&mut self) -> OperatorResult {
233            if self.position < self.chunks.len() {
234                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
235                self.position += 1;
236                Ok(Some(chunk))
237            } else {
238                Ok(None)
239            }
240        }
241
242        fn reset(&mut self) {
243            self.position = 0;
244        }
245
246        fn name(&self) -> &'static str {
247            "MockOperator"
248        }
249
250        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
251            self
252        }
253    }
254
255    #[test]
256    fn test_unwind_basic() {
257        // Create a chunk with a list column [1, 2, 3]
258        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); // Any for list
259        let list = Value::List(Arc::new([
260            Value::Int64(1),
261            Value::Int64(2),
262            Value::Int64(3),
263        ]));
264        builder.column_mut(0).unwrap().push_value(list);
265        builder.advance_row();
266        let chunk = builder.finish();
267
268        let mock = MockOperator {
269            chunks: vec![chunk],
270            position: 0,
271        };
272
273        // Create unwind operator
274        let mut unwind = UnwindOperator::new(
275            Box::new(mock),
276            0,
277            "x".to_string(),
278            vec![LogicalType::Int64], // Output is just the unwound element
279            false,
280            false,
281        );
282
283        // Should produce 3 rows
284        let mut results = Vec::new();
285        while let Ok(Some(chunk)) = unwind.next() {
286            results.push(chunk);
287        }
288
289        assert_eq!(results.len(), 3);
290    }
291
292    #[test]
293    fn test_unwind_empty_list() {
294        // A list with zero elements should produce no output rows
295        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
296        let list = Value::List(Arc::new([]));
297        builder.column_mut(0).unwrap().push_value(list);
298        builder.advance_row();
299        let chunk = builder.finish();
300
301        let mock = MockOperator {
302            chunks: vec![chunk],
303            position: 0,
304        };
305
306        let mut unwind = UnwindOperator::new(
307            Box::new(mock),
308            0,
309            "x".to_string(),
310            vec![LogicalType::Int64],
311            false,
312            false,
313        );
314
315        let mut results = Vec::new();
316        while let Ok(Some(chunk)) = unwind.next() {
317            results.push(chunk);
318        }
319
320        assert_eq!(results.len(), 0, "Empty list should produce no rows");
321    }
322
323    #[test]
324    fn test_unwind_empty_input() {
325        // No chunks at all
326        let mock = MockOperator {
327            chunks: vec![],
328            position: 0,
329        };
330
331        let mut unwind = UnwindOperator::new(
332            Box::new(mock),
333            0,
334            "x".to_string(),
335            vec![LogicalType::Int64],
336            false,
337            false,
338        );
339
340        assert!(unwind.next().unwrap().is_none());
341    }
342
343    #[test]
344    fn test_unwind_multiple_rows() {
345        // Two rows with lists of different sizes
346        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
347
348        let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
349        builder.column_mut(0).unwrap().push_value(list1);
350        builder.advance_row();
351
352        let list2 = Value::List(Arc::new([Value::Int64(30)]));
353        builder.column_mut(0).unwrap().push_value(list2);
354        builder.advance_row();
355
356        let chunk = builder.finish();
357
358        let mock = MockOperator {
359            chunks: vec![chunk],
360            position: 0,
361        };
362
363        let mut unwind = UnwindOperator::new(
364            Box::new(mock),
365            0,
366            "x".to_string(),
367            vec![LogicalType::Int64],
368            false,
369            false,
370        );
371
372        let mut count = 0;
373        while let Ok(Some(_chunk)) = unwind.next() {
374            count += 1;
375        }
376
377        // 2 from first list + 1 from second list = 3 rows
378        assert_eq!(count, 3);
379    }
380
381    #[test]
382    fn test_unwind_single_element_list() {
383        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
384        let list = Value::List(Arc::new([Value::String("hello".into())]));
385        builder.column_mut(0).unwrap().push_value(list);
386        builder.advance_row();
387        let chunk = builder.finish();
388
389        let mock = MockOperator {
390            chunks: vec![chunk],
391            position: 0,
392        };
393
394        let mut unwind = UnwindOperator::new(
395            Box::new(mock),
396            0,
397            "item".to_string(),
398            vec![LogicalType::String],
399            false,
400            false,
401        );
402
403        let mut results = Vec::new();
404        while let Ok(Some(chunk)) = unwind.next() {
405            results.push(chunk);
406        }
407
408        assert_eq!(results.len(), 1);
409    }
410
411    #[test]
412    fn test_unwind_variable_name() {
413        let mock = MockOperator {
414            chunks: vec![],
415            position: 0,
416        };
417
418        let unwind = UnwindOperator::new(
419            Box::new(mock),
420            0,
421            "my_var".to_string(),
422            vec![LogicalType::Any],
423            false,
424            false,
425        );
426
427        assert_eq!(unwind.variable_name(), "my_var");
428    }
429
430    #[test]
431    fn test_unwind_name() {
432        let mock = MockOperator {
433            chunks: vec![],
434            position: 0,
435        };
436
437        let unwind = UnwindOperator::new(
438            Box::new(mock),
439            0,
440            "x".to_string(),
441            vec![LogicalType::Any],
442            false,
443            false,
444        );
445
446        assert_eq!(unwind.name(), "Unwind");
447    }
448
449    #[test]
450    fn test_unwind_with_ordinality() {
451        // Create a chunk with a list column [10, 20, 30]
452        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
453        let list = Value::List(Arc::new([
454            Value::Int64(10),
455            Value::Int64(20),
456            Value::Int64(30),
457        ]));
458        builder.column_mut(0).unwrap().push_value(list);
459        builder.advance_row();
460        let chunk = builder.finish();
461
462        let mock = MockOperator {
463            chunks: vec![chunk],
464            position: 0,
465        };
466
467        // Output schema: [element (Any), ordinality (Int64)]
468        let mut unwind = UnwindOperator::new(
469            Box::new(mock),
470            0,
471            "x".to_string(),
472            vec![LogicalType::Any, LogicalType::Int64],
473            true,  // emit_ordinality
474            false, // emit_offset
475        );
476
477        let mut ordinalities = Vec::new();
478        while let Ok(Some(chunk)) = unwind.next() {
479            if let Some(col) = chunk.column(1)
480                && let Some(Value::Int64(v)) = col.get_value(0)
481            {
482                ordinalities.push(v);
483            }
484        }
485
486        // ORDINALITY is 1-based
487        assert_eq!(ordinalities, vec![1, 2, 3]);
488    }
489
490    #[test]
491    fn test_unwind_with_offset() {
492        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
493        let list = Value::List(Arc::new([
494            Value::Int64(10),
495            Value::Int64(20),
496            Value::Int64(30),
497        ]));
498        builder.column_mut(0).unwrap().push_value(list);
499        builder.advance_row();
500        let chunk = builder.finish();
501
502        let mock = MockOperator {
503            chunks: vec![chunk],
504            position: 0,
505        };
506
507        // Output schema: [element (Any), offset (Int64)]
508        let mut unwind = UnwindOperator::new(
509            Box::new(mock),
510            0,
511            "x".to_string(),
512            vec![LogicalType::Any, LogicalType::Int64],
513            false, // emit_ordinality
514            true,  // emit_offset
515        );
516
517        let mut offsets = Vec::new();
518        while let Ok(Some(chunk)) = unwind.next() {
519            if let Some(col) = chunk.column(1)
520                && let Some(Value::Int64(v)) = col.get_value(0)
521            {
522                offsets.push(v);
523            }
524        }
525
526        // OFFSET is 0-based
527        assert_eq!(offsets, vec![0, 1, 2]);
528    }
529
530    #[test]
531    fn test_unwind_into_any() {
532        let mock = MockOperator {
533            chunks: vec![],
534            position: 0,
535        };
536        let op = UnwindOperator::new(
537            Box::new(mock),
538            0,
539            "items".to_string(),
540            vec![LogicalType::Any, LogicalType::Any],
541            false,
542            false,
543        );
544        let any = Box::new(op).into_any();
545        assert!(any.downcast::<UnwindOperator>().is_ok());
546    }
547}