Skip to main content

grafeo_core/execution/operators/
unwind.rs

1//! Unwind operator for expanding lists into individual rows.
2
3use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7/// Unwind operator that expands a list column into individual rows.
8///
9/// For each input row, if the list column contains N elements, this operator
10/// produces N output rows, each with one element from the list.
11pub struct UnwindOperator {
12    /// Child operator to read from.
13    child: Box<dyn Operator>,
14    /// Index of the column containing the list to unwind.
15    list_col_idx: usize,
16    /// Name of the new variable for the unwound elements.
17    variable_name: String,
18    /// Schema of output columns (inherited from input plus the new column).
19    output_schema: Vec<LogicalType>,
20    /// Current input chunk being processed.
21    current_chunk: Option<DataChunk>,
22    /// Current row index within the chunk.
23    current_row: usize,
24    /// Current index within the list being unwound.
25    current_list_idx: usize,
26    /// Current list being unwound.
27    current_list: Option<Vec<Value>>,
28}
29
30impl UnwindOperator {
31    /// Creates a new unwind operator.
32    ///
33    /// # Arguments
34    /// * `child` - The input operator
35    /// * `list_col_idx` - The column index containing the list to unwind
36    /// * `variable_name` - The name of the new variable
37    /// * `output_schema` - The schema for output (should include the new column type)
38    pub fn new(
39        child: Box<dyn Operator>,
40        list_col_idx: usize,
41        variable_name: String,
42        output_schema: Vec<LogicalType>,
43    ) -> Self {
44        Self {
45            child,
46            list_col_idx,
47            variable_name,
48            output_schema,
49            current_chunk: None,
50            current_row: 0,
51            current_list_idx: 0,
52            current_list: None,
53        }
54    }
55
56    /// Returns the variable name for the unwound elements.
57    #[must_use]
58    pub fn variable_name(&self) -> &str {
59        &self.variable_name
60    }
61
62    /// Advances to the next row or fetches the next chunk.
63    fn advance(&mut self) -> OperatorResult {
64        loop {
65            // If we have a current list, try to get the next element
66            if let Some(list) = &self.current_list
67                && self.current_list_idx < list.len()
68            {
69                // Still have elements in the current list
70                return Ok(Some(self.emit_row()?));
71            }
72
73            // Need to move to the next row
74            self.current_list_idx = 0;
75            self.current_list = None;
76
77            // Get the next chunk if needed
78            if self.current_chunk.is_none() {
79                self.current_chunk = self.child.next()?;
80                self.current_row = 0;
81                if self.current_chunk.is_none() {
82                    return Ok(None); // No more data
83                }
84            }
85
86            let chunk = self.current_chunk.as_ref().unwrap();
87
88            // Find the next row with a list value
89            while self.current_row < chunk.row_count() {
90                if let Some(col) = chunk.column(self.list_col_idx)
91                    && let Some(value) = col.get_value(self.current_row)
92                    && let Value::List(list_arc) = value
93                {
94                    // Found a list - store it and return first element
95                    let list: Vec<Value> = list_arc.iter().cloned().collect();
96                    if !list.is_empty() {
97                        self.current_list = Some(list);
98                        return Ok(Some(self.emit_row()?));
99                    }
100                }
101                self.current_row += 1;
102            }
103
104            // Exhausted current chunk, get next one
105            self.current_chunk = None;
106        }
107    }
108
109    /// Emits a single row with the current list element.
110    fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
111        let chunk = self.current_chunk.as_ref().unwrap();
112        let list = self.current_list.as_ref().unwrap();
113        let element = list[self.current_list_idx].clone();
114
115        // Build output row: copy all columns from input + add the unwound element
116        let mut builder = DataChunkBuilder::new(&self.output_schema);
117
118        // Copy existing columns (except the list column which we're replacing)
119        for col_idx in 0..chunk.column_count() {
120            if col_idx == self.list_col_idx {
121                continue; // Skip the list column
122            }
123            if let Some(col) = chunk.column(col_idx)
124                && let Some(value) = col.get_value(self.current_row)
125                && let Some(out_col) = builder.column_mut(col_idx)
126            {
127                out_col.push_value(value);
128            }
129        }
130
131        // Add the unwound element as the last column
132        let new_col_idx = self.output_schema.len() - 1;
133        if let Some(out_col) = builder.column_mut(new_col_idx) {
134            out_col.push_value(element);
135        }
136
137        builder.advance_row();
138        self.current_list_idx += 1;
139
140        // If we've exhausted this list, move to the next row
141        if self.current_list_idx >= list.len() {
142            self.current_row += 1;
143        }
144
145        Ok(builder.finish())
146    }
147}
148
149impl Operator for UnwindOperator {
150    fn next(&mut self) -> OperatorResult {
151        self.advance()
152    }
153
154    fn reset(&mut self) {
155        self.child.reset();
156        self.current_chunk = None;
157        self.current_row = 0;
158        self.current_list_idx = 0;
159        self.current_list = None;
160    }
161
162    fn name(&self) -> &'static str {
163        "Unwind"
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::execution::chunk::DataChunkBuilder;
171    use std::sync::Arc;
172
173    struct MockOperator {
174        chunks: Vec<DataChunk>,
175        position: usize,
176    }
177
178    impl Operator for MockOperator {
179        fn next(&mut self) -> OperatorResult {
180            if self.position < self.chunks.len() {
181                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
182                self.position += 1;
183                Ok(Some(chunk))
184            } else {
185                Ok(None)
186            }
187        }
188
189        fn reset(&mut self) {
190            self.position = 0;
191        }
192
193        fn name(&self) -> &'static str {
194            "MockOperator"
195        }
196    }
197
198    #[test]
199    fn test_unwind_basic() {
200        // Create a chunk with a list column [1, 2, 3]
201        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); // Any for list
202        let list = Value::List(Arc::new([
203            Value::Int64(1),
204            Value::Int64(2),
205            Value::Int64(3),
206        ]));
207        builder.column_mut(0).unwrap().push_value(list);
208        builder.advance_row();
209        let chunk = builder.finish();
210
211        let mock = MockOperator {
212            chunks: vec![chunk],
213            position: 0,
214        };
215
216        // Create unwind operator
217        let mut unwind = UnwindOperator::new(
218            Box::new(mock),
219            0,
220            "x".to_string(),
221            vec![LogicalType::Int64], // Output is just the unwound element
222        );
223
224        // Should produce 3 rows
225        let mut results = Vec::new();
226        while let Ok(Some(chunk)) = unwind.next() {
227            results.push(chunk);
228        }
229
230        assert_eq!(results.len(), 3);
231    }
232
233    #[test]
234    fn test_unwind_empty_list() {
235        // A list with zero elements should produce no output rows
236        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
237        let list = Value::List(Arc::new([]));
238        builder.column_mut(0).unwrap().push_value(list);
239        builder.advance_row();
240        let chunk = builder.finish();
241
242        let mock = MockOperator {
243            chunks: vec![chunk],
244            position: 0,
245        };
246
247        let mut unwind =
248            UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Int64]);
249
250        let mut results = Vec::new();
251        while let Ok(Some(chunk)) = unwind.next() {
252            results.push(chunk);
253        }
254
255        assert_eq!(results.len(), 0, "Empty list should produce no rows");
256    }
257
258    #[test]
259    fn test_unwind_empty_input() {
260        // No chunks at all
261        let mock = MockOperator {
262            chunks: vec![],
263            position: 0,
264        };
265
266        let mut unwind =
267            UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Int64]);
268
269        assert!(unwind.next().unwrap().is_none());
270    }
271
272    #[test]
273    fn test_unwind_multiple_rows() {
274        // Two rows with lists of different sizes
275        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
276
277        let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
278        builder.column_mut(0).unwrap().push_value(list1);
279        builder.advance_row();
280
281        let list2 = Value::List(Arc::new([Value::Int64(30)]));
282        builder.column_mut(0).unwrap().push_value(list2);
283        builder.advance_row();
284
285        let chunk = builder.finish();
286
287        let mock = MockOperator {
288            chunks: vec![chunk],
289            position: 0,
290        };
291
292        let mut unwind =
293            UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Int64]);
294
295        let mut count = 0;
296        while let Ok(Some(_chunk)) = unwind.next() {
297            count += 1;
298        }
299
300        // 2 from first list + 1 from second list = 3 rows
301        assert_eq!(count, 3);
302    }
303
304    #[test]
305    fn test_unwind_single_element_list() {
306        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
307        let list = Value::List(Arc::new([Value::String("hello".into())]));
308        builder.column_mut(0).unwrap().push_value(list);
309        builder.advance_row();
310        let chunk = builder.finish();
311
312        let mock = MockOperator {
313            chunks: vec![chunk],
314            position: 0,
315        };
316
317        let mut unwind = UnwindOperator::new(
318            Box::new(mock),
319            0,
320            "item".to_string(),
321            vec![LogicalType::String],
322        );
323
324        let mut results = Vec::new();
325        while let Ok(Some(chunk)) = unwind.next() {
326            results.push(chunk);
327        }
328
329        assert_eq!(results.len(), 1);
330    }
331
332    #[test]
333    fn test_unwind_variable_name() {
334        let mock = MockOperator {
335            chunks: vec![],
336            position: 0,
337        };
338
339        let unwind = UnwindOperator::new(
340            Box::new(mock),
341            0,
342            "my_var".to_string(),
343            vec![LogicalType::Any],
344        );
345
346        assert_eq!(unwind.variable_name(), "my_var");
347    }
348
349    #[test]
350    fn test_unwind_name() {
351        let mock = MockOperator {
352            chunks: vec![],
353            position: 0,
354        };
355
356        let unwind =
357            UnwindOperator::new(Box::new(mock), 0, "x".to_string(), vec![LogicalType::Any]);
358
359        assert_eq!(unwind.name(), "Unwind");
360    }
361}