Skip to main content

grafeo_core/execution/operators/
unwind.rs

1//! Unwind operator for expanding lists into individual rows.
2
3use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7/// Unwind operator that expands a list column into individual rows.
8///
9/// For each input row, if the list column contains N elements, this operator
10/// produces N output rows, each with one element from the list.
11pub struct UnwindOperator {
12    /// Child operator to read from.
13    child: Box<dyn Operator>,
14    /// Index of the column containing the list to unwind.
15    list_col_idx: usize,
16    /// Name of the new variable for the unwound elements.
17    variable_name: String,
18    /// Schema of output columns (inherited from input plus the new column).
19    output_schema: Vec<LogicalType>,
20    /// Whether to emit a 1-based ORDINALITY column.
21    emit_ordinality: bool,
22    /// Whether to emit a 0-based OFFSET column.
23    emit_offset: bool,
24    /// Current input chunk being processed.
25    current_chunk: Option<DataChunk>,
26    /// Current row index within the chunk.
27    current_row: usize,
28    /// Current index within the list being unwound.
29    current_list_idx: usize,
30    /// Current list being unwound.
31    current_list: Option<Vec<Value>>,
32}
33
34impl UnwindOperator {
35    /// Creates a new unwind operator.
36    ///
37    /// # Arguments
38    /// * `child` - The input operator
39    /// * `list_col_idx` - The column index containing the list to unwind
40    /// * `variable_name` - The name of the new variable
41    /// * `output_schema` - The schema for output (should include the new column type)
42    /// * `emit_ordinality` - Whether to emit a 1-based index column
43    /// * `emit_offset` - Whether to emit a 0-based index column
44    pub fn new(
45        child: Box<dyn Operator>,
46        list_col_idx: usize,
47        variable_name: String,
48        output_schema: Vec<LogicalType>,
49        emit_ordinality: bool,
50        emit_offset: bool,
51    ) -> Self {
52        Self {
53            child,
54            list_col_idx,
55            variable_name,
56            output_schema,
57            emit_ordinality,
58            emit_offset,
59            current_chunk: None,
60            current_row: 0,
61            current_list_idx: 0,
62            current_list: None,
63        }
64    }
65
66    /// Returns the variable name for the unwound elements.
67    #[must_use]
68    pub fn variable_name(&self) -> &str {
69        &self.variable_name
70    }
71
72    /// Advances to the next row or fetches the next chunk.
73    fn advance(&mut self) -> OperatorResult {
74        loop {
75            // If we have a current list, try to get the next element
76            if let Some(list) = &self.current_list
77                && self.current_list_idx < list.len()
78            {
79                // Still have elements in the current list
80                return Ok(Some(self.emit_row()?));
81            }
82
83            // Need to move to the next row
84            self.current_list_idx = 0;
85            self.current_list = None;
86
87            // Get the next chunk if needed
88            if self.current_chunk.is_none() {
89                self.current_chunk = self.child.next()?;
90                self.current_row = 0;
91                if self.current_chunk.is_none() {
92                    return Ok(None); // No more data
93                }
94            }
95
96            let chunk = self.current_chunk.as_ref().unwrap();
97
98            // Find the next row with a list value
99            while self.current_row < chunk.row_count() {
100                if let Some(col) = chunk.column(self.list_col_idx)
101                    && let Some(value) = col.get_value(self.current_row)
102                {
103                    // Extract list elements from either Value::List or Value::Vector
104                    let list = match value {
105                        Value::List(list_arc) => list_arc.iter().cloned().collect::<Vec<Value>>(),
106                        Value::Vector(vec_arc) => {
107                            vec_arc.iter().map(|f| Value::Float64(*f as f64)).collect()
108                        }
109                        _ => {
110                            self.current_row += 1;
111                            continue;
112                        }
113                    };
114                    if !list.is_empty() {
115                        self.current_list = Some(list);
116                        return Ok(Some(self.emit_row()?));
117                    }
118                }
119                self.current_row += 1;
120            }
121
122            // Exhausted current chunk, get next one
123            self.current_chunk = None;
124        }
125    }
126
127    /// Emits a single row with the current list element.
128    fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
129        let chunk = self.current_chunk.as_ref().unwrap();
130        let list = self.current_list.as_ref().unwrap();
131        let element = list[self.current_list_idx].clone();
132
133        // Build output row: copy all columns from input + add the unwound element
134        let mut builder = DataChunkBuilder::new(&self.output_schema);
135
136        // Copy existing columns (except the list column which we're replacing)
137        for col_idx in 0..chunk.column_count() {
138            if col_idx == self.list_col_idx {
139                continue; // Skip the list column
140            }
141            if let Some(col) = chunk.column(col_idx)
142                && let Some(value) = col.get_value(self.current_row)
143                && let Some(out_col) = builder.column_mut(col_idx)
144            {
145                out_col.push_value(value);
146            }
147        }
148
149        // Add the unwound element column.
150        // It's at the end of the output schema, minus any ordinality/offset columns.
151        let extra_cols = usize::from(self.emit_ordinality) + usize::from(self.emit_offset);
152        let element_col_idx = self.output_schema.len() - 1 - extra_cols;
153        if let Some(out_col) = builder.column_mut(element_col_idx) {
154            out_col.push_value(element);
155        }
156
157        // Add ORDINALITY (1-based) if requested
158        let mut next_col = element_col_idx + 1;
159        if self.emit_ordinality {
160            if let Some(out_col) = builder.column_mut(next_col) {
161                out_col.push_value(Value::Int64((self.current_list_idx + 1) as i64));
162            }
163            next_col += 1;
164        }
165
166        // Add OFFSET (0-based) if requested
167        if self.emit_offset
168            && let Some(out_col) = builder.column_mut(next_col)
169        {
170            out_col.push_value(Value::Int64(self.current_list_idx as i64));
171        }
172
173        builder.advance_row();
174        self.current_list_idx += 1;
175
176        // If we've exhausted this list, move to the next row
177        if self.current_list_idx >= list.len() {
178            self.current_row += 1;
179        }
180
181        Ok(builder.finish())
182    }
183}
184
185impl Operator for UnwindOperator {
186    fn next(&mut self) -> OperatorResult {
187        self.advance()
188    }
189
190    fn reset(&mut self) {
191        self.child.reset();
192        self.current_chunk = None;
193        self.current_row = 0;
194        self.current_list_idx = 0;
195        self.current_list = None;
196    }
197
198    fn name(&self) -> &'static str {
199        "Unwind"
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::execution::chunk::DataChunkBuilder;
207    use std::sync::Arc;
208
209    struct MockOperator {
210        chunks: Vec<DataChunk>,
211        position: usize,
212    }
213
214    impl Operator for MockOperator {
215        fn next(&mut self) -> OperatorResult {
216            if self.position < self.chunks.len() {
217                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
218                self.position += 1;
219                Ok(Some(chunk))
220            } else {
221                Ok(None)
222            }
223        }
224
225        fn reset(&mut self) {
226            self.position = 0;
227        }
228
229        fn name(&self) -> &'static str {
230            "MockOperator"
231        }
232    }
233
234    #[test]
235    fn test_unwind_basic() {
236        // Create a chunk with a list column [1, 2, 3]
237        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); // Any for list
238        let list = Value::List(Arc::new([
239            Value::Int64(1),
240            Value::Int64(2),
241            Value::Int64(3),
242        ]));
243        builder.column_mut(0).unwrap().push_value(list);
244        builder.advance_row();
245        let chunk = builder.finish();
246
247        let mock = MockOperator {
248            chunks: vec![chunk],
249            position: 0,
250        };
251
252        // Create unwind operator
253        let mut unwind = UnwindOperator::new(
254            Box::new(mock),
255            0,
256            "x".to_string(),
257            vec![LogicalType::Int64], // Output is just the unwound element
258            false,
259            false,
260        );
261
262        // Should produce 3 rows
263        let mut results = Vec::new();
264        while let Ok(Some(chunk)) = unwind.next() {
265            results.push(chunk);
266        }
267
268        assert_eq!(results.len(), 3);
269    }
270
271    #[test]
272    fn test_unwind_empty_list() {
273        // A list with zero elements should produce no output rows
274        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
275        let list = Value::List(Arc::new([]));
276        builder.column_mut(0).unwrap().push_value(list);
277        builder.advance_row();
278        let chunk = builder.finish();
279
280        let mock = MockOperator {
281            chunks: vec![chunk],
282            position: 0,
283        };
284
285        let mut unwind = UnwindOperator::new(
286            Box::new(mock),
287            0,
288            "x".to_string(),
289            vec![LogicalType::Int64],
290            false,
291            false,
292        );
293
294        let mut results = Vec::new();
295        while let Ok(Some(chunk)) = unwind.next() {
296            results.push(chunk);
297        }
298
299        assert_eq!(results.len(), 0, "Empty list should produce no rows");
300    }
301
302    #[test]
303    fn test_unwind_empty_input() {
304        // No chunks at all
305        let mock = MockOperator {
306            chunks: vec![],
307            position: 0,
308        };
309
310        let mut unwind = UnwindOperator::new(
311            Box::new(mock),
312            0,
313            "x".to_string(),
314            vec![LogicalType::Int64],
315            false,
316            false,
317        );
318
319        assert!(unwind.next().unwrap().is_none());
320    }
321
322    #[test]
323    fn test_unwind_multiple_rows() {
324        // Two rows with lists of different sizes
325        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
326
327        let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
328        builder.column_mut(0).unwrap().push_value(list1);
329        builder.advance_row();
330
331        let list2 = Value::List(Arc::new([Value::Int64(30)]));
332        builder.column_mut(0).unwrap().push_value(list2);
333        builder.advance_row();
334
335        let chunk = builder.finish();
336
337        let mock = MockOperator {
338            chunks: vec![chunk],
339            position: 0,
340        };
341
342        let mut unwind = UnwindOperator::new(
343            Box::new(mock),
344            0,
345            "x".to_string(),
346            vec![LogicalType::Int64],
347            false,
348            false,
349        );
350
351        let mut count = 0;
352        while let Ok(Some(_chunk)) = unwind.next() {
353            count += 1;
354        }
355
356        // 2 from first list + 1 from second list = 3 rows
357        assert_eq!(count, 3);
358    }
359
360    #[test]
361    fn test_unwind_single_element_list() {
362        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
363        let list = Value::List(Arc::new([Value::String("hello".into())]));
364        builder.column_mut(0).unwrap().push_value(list);
365        builder.advance_row();
366        let chunk = builder.finish();
367
368        let mock = MockOperator {
369            chunks: vec![chunk],
370            position: 0,
371        };
372
373        let mut unwind = UnwindOperator::new(
374            Box::new(mock),
375            0,
376            "item".to_string(),
377            vec![LogicalType::String],
378            false,
379            false,
380        );
381
382        let mut results = Vec::new();
383        while let Ok(Some(chunk)) = unwind.next() {
384            results.push(chunk);
385        }
386
387        assert_eq!(results.len(), 1);
388    }
389
390    #[test]
391    fn test_unwind_variable_name() {
392        let mock = MockOperator {
393            chunks: vec![],
394            position: 0,
395        };
396
397        let unwind = UnwindOperator::new(
398            Box::new(mock),
399            0,
400            "my_var".to_string(),
401            vec![LogicalType::Any],
402            false,
403            false,
404        );
405
406        assert_eq!(unwind.variable_name(), "my_var");
407    }
408
409    #[test]
410    fn test_unwind_name() {
411        let mock = MockOperator {
412            chunks: vec![],
413            position: 0,
414        };
415
416        let unwind = UnwindOperator::new(
417            Box::new(mock),
418            0,
419            "x".to_string(),
420            vec![LogicalType::Any],
421            false,
422            false,
423        );
424
425        assert_eq!(unwind.name(), "Unwind");
426    }
427
428    #[test]
429    fn test_unwind_with_ordinality() {
430        // Create a chunk with a list column [10, 20, 30]
431        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
432        let list = Value::List(Arc::new([
433            Value::Int64(10),
434            Value::Int64(20),
435            Value::Int64(30),
436        ]));
437        builder.column_mut(0).unwrap().push_value(list);
438        builder.advance_row();
439        let chunk = builder.finish();
440
441        let mock = MockOperator {
442            chunks: vec![chunk],
443            position: 0,
444        };
445
446        // Output schema: [element (Any), ordinality (Int64)]
447        let mut unwind = UnwindOperator::new(
448            Box::new(mock),
449            0,
450            "x".to_string(),
451            vec![LogicalType::Any, LogicalType::Int64],
452            true,  // emit_ordinality
453            false, // emit_offset
454        );
455
456        let mut ordinalities = Vec::new();
457        while let Ok(Some(chunk)) = unwind.next() {
458            if let Some(col) = chunk.column(1)
459                && let Some(Value::Int64(v)) = col.get_value(0)
460            {
461                ordinalities.push(v);
462            }
463        }
464
465        // ORDINALITY is 1-based
466        assert_eq!(ordinalities, vec![1, 2, 3]);
467    }
468
469    #[test]
470    fn test_unwind_with_offset() {
471        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
472        let list = Value::List(Arc::new([
473            Value::Int64(10),
474            Value::Int64(20),
475            Value::Int64(30),
476        ]));
477        builder.column_mut(0).unwrap().push_value(list);
478        builder.advance_row();
479        let chunk = builder.finish();
480
481        let mock = MockOperator {
482            chunks: vec![chunk],
483            position: 0,
484        };
485
486        // Output schema: [element (Any), offset (Int64)]
487        let mut unwind = UnwindOperator::new(
488            Box::new(mock),
489            0,
490            "x".to_string(),
491            vec![LogicalType::Any, LogicalType::Int64],
492            false, // emit_ordinality
493            true,  // emit_offset
494        );
495
496        let mut offsets = Vec::new();
497        while let Ok(Some(chunk)) = unwind.next() {
498            if let Some(col) = chunk.column(1)
499                && let Some(Value::Int64(v)) = col.get_value(0)
500            {
501                offsets.push(v);
502            }
503        }
504
505        // OFFSET is 0-based
506        assert_eq!(offsets, vec![0, 1, 2]);
507    }
508}