Skip to main content

grafeo_core/execution/operators/
unwind.rs

1//! Unwind operator for expanding lists into individual rows.
2
3use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use grafeo_common::types::{LogicalType, Value};
6
7/// Unwind operator that expands a list column into individual rows.
8///
9/// For each input row, if the list column contains N elements, this operator
10/// produces N output rows, each with one element from the list.
11pub struct UnwindOperator {
12    /// Child operator to read from.
13    child: Box<dyn Operator>,
14    /// Index of the column containing the list to unwind.
15    list_col_idx: usize,
16    /// Name of the new variable for the unwound elements.
17    variable_name: String,
18    /// Schema of output columns (inherited from input plus the new column).
19    output_schema: Vec<LogicalType>,
20    /// Whether to emit a 1-based ORDINALITY column.
21    emit_ordinality: bool,
22    /// Whether to emit a 0-based OFFSET column.
23    emit_offset: bool,
24    /// Current input chunk being processed.
25    current_chunk: Option<DataChunk>,
26    /// Current row index within the chunk.
27    current_row: usize,
28    /// Current index within the list being unwound.
29    current_list_idx: usize,
30    /// Current list being unwound.
31    current_list: Option<Vec<Value>>,
32}
33
34impl UnwindOperator {
35    /// Creates a new unwind operator.
36    ///
37    /// # Arguments
38    /// * `child` - The input operator
39    /// * `list_col_idx` - The column index containing the list to unwind
40    /// * `variable_name` - The name of the new variable
41    /// * `output_schema` - The schema for output (should include the new column type)
42    /// * `emit_ordinality` - Whether to emit a 1-based index column
43    /// * `emit_offset` - Whether to emit a 0-based index column
44    pub fn new(
45        child: Box<dyn Operator>,
46        list_col_idx: usize,
47        variable_name: String,
48        output_schema: Vec<LogicalType>,
49        emit_ordinality: bool,
50        emit_offset: bool,
51    ) -> Self {
52        Self {
53            child,
54            list_col_idx,
55            variable_name,
56            output_schema,
57            emit_ordinality,
58            emit_offset,
59            current_chunk: None,
60            current_row: 0,
61            current_list_idx: 0,
62            current_list: None,
63        }
64    }
65
66    /// Returns the variable name for the unwound elements.
67    #[must_use]
68    pub fn variable_name(&self) -> &str {
69        &self.variable_name
70    }
71
72    /// Advances to the next row or fetches the next chunk.
73    fn advance(&mut self) -> OperatorResult {
74        loop {
75            // If we have a current list, try to get the next element
76            if let Some(list) = &self.current_list
77                && self.current_list_idx < list.len()
78            {
79                // Still have elements in the current list
80                return Ok(Some(self.emit_row()?));
81            }
82
83            // Need to move to the next row
84            self.current_list_idx = 0;
85            self.current_list = None;
86
87            // Get the next chunk if needed
88            if self.current_chunk.is_none() {
89                self.current_chunk = self.child.next()?;
90                self.current_row = 0;
91                if self.current_chunk.is_none() {
92                    return Ok(None); // No more data
93                }
94            }
95
96            let chunk = self
97                .current_chunk
98                .as_ref()
99                .expect("current_chunk is Some: checked above");
100
101            // Find the next row with a list value
102            while self.current_row < chunk.row_count() {
103                if let Some(col) = chunk.column(self.list_col_idx)
104                    && let Some(value) = col.get_value(self.current_row)
105                {
106                    // Extract list elements from either Value::List or Value::Vector
107                    let list = match value {
108                        Value::List(list_arc) => list_arc.iter().cloned().collect::<Vec<Value>>(),
109                        Value::Vector(vec_arc) => {
110                            vec_arc.iter().map(|f| Value::Float64(*f as f64)).collect()
111                        }
112                        _ => {
113                            self.current_row += 1;
114                            continue;
115                        }
116                    };
117                    if !list.is_empty() {
118                        self.current_list = Some(list);
119                        return Ok(Some(self.emit_row()?));
120                    }
121                }
122                self.current_row += 1;
123            }
124
125            // Exhausted current chunk, get next one
126            self.current_chunk = None;
127        }
128    }
129
130    /// Emits a single row with the current list element.
131    fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
132        let chunk = self
133            .current_chunk
134            .as_ref()
135            .expect("current_chunk is Some: set before emit_row call");
136        let list = self
137            .current_list
138            .as_ref()
139            .expect("current_list is Some: set before emit_row call");
140        let element = list[self.current_list_idx].clone();
141
142        // Build output row: copy all columns from input + add the unwound element
143        let mut builder = DataChunkBuilder::new(&self.output_schema);
144
145        // Copy existing columns (except the list column which we're replacing)
146        for col_idx in 0..chunk.column_count() {
147            if col_idx == self.list_col_idx {
148                continue; // Skip the list column
149            }
150            if let Some(col) = chunk.column(col_idx)
151                && let Some(value) = col.get_value(self.current_row)
152                && let Some(out_col) = builder.column_mut(col_idx)
153            {
154                out_col.push_value(value);
155            }
156        }
157
158        // Add the unwound element column.
159        // It's at the end of the output schema, minus any ordinality/offset columns.
160        let extra_cols = usize::from(self.emit_ordinality) + usize::from(self.emit_offset);
161        let element_col_idx = self.output_schema.len() - 1 - extra_cols;
162        if let Some(out_col) = builder.column_mut(element_col_idx) {
163            out_col.push_value(element);
164        }
165
166        // Add ORDINALITY (1-based) if requested
167        let mut next_col = element_col_idx + 1;
168        if self.emit_ordinality {
169            if let Some(out_col) = builder.column_mut(next_col) {
170                out_col.push_value(Value::Int64((self.current_list_idx + 1) as i64));
171            }
172            next_col += 1;
173        }
174
175        // Add OFFSET (0-based) if requested
176        if self.emit_offset
177            && let Some(out_col) = builder.column_mut(next_col)
178        {
179            out_col.push_value(Value::Int64(self.current_list_idx as i64));
180        }
181
182        builder.advance_row();
183        self.current_list_idx += 1;
184
185        // If we've exhausted this list, move to the next row
186        if self.current_list_idx >= list.len() {
187            self.current_row += 1;
188        }
189
190        Ok(builder.finish())
191    }
192}
193
194impl Operator for UnwindOperator {
195    fn next(&mut self) -> OperatorResult {
196        self.advance()
197    }
198
199    fn reset(&mut self) {
200        self.child.reset();
201        self.current_chunk = None;
202        self.current_row = 0;
203        self.current_list_idx = 0;
204        self.current_list = None;
205    }
206
207    fn name(&self) -> &'static str {
208        "Unwind"
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::execution::chunk::DataChunkBuilder;
216    use std::sync::Arc;
217
218    struct MockOperator {
219        chunks: Vec<DataChunk>,
220        position: usize,
221    }
222
223    impl Operator for MockOperator {
224        fn next(&mut self) -> OperatorResult {
225            if self.position < self.chunks.len() {
226                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
227                self.position += 1;
228                Ok(Some(chunk))
229            } else {
230                Ok(None)
231            }
232        }
233
234        fn reset(&mut self) {
235            self.position = 0;
236        }
237
238        fn name(&self) -> &'static str {
239            "MockOperator"
240        }
241    }
242
243    #[test]
244    fn test_unwind_basic() {
245        // Create a chunk with a list column [1, 2, 3]
246        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); // Any for list
247        let list = Value::List(Arc::new([
248            Value::Int64(1),
249            Value::Int64(2),
250            Value::Int64(3),
251        ]));
252        builder.column_mut(0).unwrap().push_value(list);
253        builder.advance_row();
254        let chunk = builder.finish();
255
256        let mock = MockOperator {
257            chunks: vec![chunk],
258            position: 0,
259        };
260
261        // Create unwind operator
262        let mut unwind = UnwindOperator::new(
263            Box::new(mock),
264            0,
265            "x".to_string(),
266            vec![LogicalType::Int64], // Output is just the unwound element
267            false,
268            false,
269        );
270
271        // Should produce 3 rows
272        let mut results = Vec::new();
273        while let Ok(Some(chunk)) = unwind.next() {
274            results.push(chunk);
275        }
276
277        assert_eq!(results.len(), 3);
278    }
279
280    #[test]
281    fn test_unwind_empty_list() {
282        // A list with zero elements should produce no output rows
283        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
284        let list = Value::List(Arc::new([]));
285        builder.column_mut(0).unwrap().push_value(list);
286        builder.advance_row();
287        let chunk = builder.finish();
288
289        let mock = MockOperator {
290            chunks: vec![chunk],
291            position: 0,
292        };
293
294        let mut unwind = UnwindOperator::new(
295            Box::new(mock),
296            0,
297            "x".to_string(),
298            vec![LogicalType::Int64],
299            false,
300            false,
301        );
302
303        let mut results = Vec::new();
304        while let Ok(Some(chunk)) = unwind.next() {
305            results.push(chunk);
306        }
307
308        assert_eq!(results.len(), 0, "Empty list should produce no rows");
309    }
310
311    #[test]
312    fn test_unwind_empty_input() {
313        // No chunks at all
314        let mock = MockOperator {
315            chunks: vec![],
316            position: 0,
317        };
318
319        let mut unwind = UnwindOperator::new(
320            Box::new(mock),
321            0,
322            "x".to_string(),
323            vec![LogicalType::Int64],
324            false,
325            false,
326        );
327
328        assert!(unwind.next().unwrap().is_none());
329    }
330
331    #[test]
332    fn test_unwind_multiple_rows() {
333        // Two rows with lists of different sizes
334        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
335
336        let list1 = Value::List(Arc::new([Value::Int64(10), Value::Int64(20)]));
337        builder.column_mut(0).unwrap().push_value(list1);
338        builder.advance_row();
339
340        let list2 = Value::List(Arc::new([Value::Int64(30)]));
341        builder.column_mut(0).unwrap().push_value(list2);
342        builder.advance_row();
343
344        let chunk = builder.finish();
345
346        let mock = MockOperator {
347            chunks: vec![chunk],
348            position: 0,
349        };
350
351        let mut unwind = UnwindOperator::new(
352            Box::new(mock),
353            0,
354            "x".to_string(),
355            vec![LogicalType::Int64],
356            false,
357            false,
358        );
359
360        let mut count = 0;
361        while let Ok(Some(_chunk)) = unwind.next() {
362            count += 1;
363        }
364
365        // 2 from first list + 1 from second list = 3 rows
366        assert_eq!(count, 3);
367    }
368
369    #[test]
370    fn test_unwind_single_element_list() {
371        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
372        let list = Value::List(Arc::new([Value::String("hello".into())]));
373        builder.column_mut(0).unwrap().push_value(list);
374        builder.advance_row();
375        let chunk = builder.finish();
376
377        let mock = MockOperator {
378            chunks: vec![chunk],
379            position: 0,
380        };
381
382        let mut unwind = UnwindOperator::new(
383            Box::new(mock),
384            0,
385            "item".to_string(),
386            vec![LogicalType::String],
387            false,
388            false,
389        );
390
391        let mut results = Vec::new();
392        while let Ok(Some(chunk)) = unwind.next() {
393            results.push(chunk);
394        }
395
396        assert_eq!(results.len(), 1);
397    }
398
399    #[test]
400    fn test_unwind_variable_name() {
401        let mock = MockOperator {
402            chunks: vec![],
403            position: 0,
404        };
405
406        let unwind = UnwindOperator::new(
407            Box::new(mock),
408            0,
409            "my_var".to_string(),
410            vec![LogicalType::Any],
411            false,
412            false,
413        );
414
415        assert_eq!(unwind.variable_name(), "my_var");
416    }
417
418    #[test]
419    fn test_unwind_name() {
420        let mock = MockOperator {
421            chunks: vec![],
422            position: 0,
423        };
424
425        let unwind = UnwindOperator::new(
426            Box::new(mock),
427            0,
428            "x".to_string(),
429            vec![LogicalType::Any],
430            false,
431            false,
432        );
433
434        assert_eq!(unwind.name(), "Unwind");
435    }
436
437    #[test]
438    fn test_unwind_with_ordinality() {
439        // Create a chunk with a list column [10, 20, 30]
440        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
441        let list = Value::List(Arc::new([
442            Value::Int64(10),
443            Value::Int64(20),
444            Value::Int64(30),
445        ]));
446        builder.column_mut(0).unwrap().push_value(list);
447        builder.advance_row();
448        let chunk = builder.finish();
449
450        let mock = MockOperator {
451            chunks: vec![chunk],
452            position: 0,
453        };
454
455        // Output schema: [element (Any), ordinality (Int64)]
456        let mut unwind = UnwindOperator::new(
457            Box::new(mock),
458            0,
459            "x".to_string(),
460            vec![LogicalType::Any, LogicalType::Int64],
461            true,  // emit_ordinality
462            false, // emit_offset
463        );
464
465        let mut ordinalities = Vec::new();
466        while let Ok(Some(chunk)) = unwind.next() {
467            if let Some(col) = chunk.column(1)
468                && let Some(Value::Int64(v)) = col.get_value(0)
469            {
470                ordinalities.push(v);
471            }
472        }
473
474        // ORDINALITY is 1-based
475        assert_eq!(ordinalities, vec![1, 2, 3]);
476    }
477
478    #[test]
479    fn test_unwind_with_offset() {
480        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
481        let list = Value::List(Arc::new([
482            Value::Int64(10),
483            Value::Int64(20),
484            Value::Int64(30),
485        ]));
486        builder.column_mut(0).unwrap().push_value(list);
487        builder.advance_row();
488        let chunk = builder.finish();
489
490        let mock = MockOperator {
491            chunks: vec![chunk],
492            position: 0,
493        };
494
495        // Output schema: [element (Any), offset (Int64)]
496        let mut unwind = UnwindOperator::new(
497            Box::new(mock),
498            0,
499            "x".to_string(),
500            vec![LogicalType::Any, LogicalType::Int64],
501            false, // emit_ordinality
502            true,  // emit_offset
503        );
504
505        let mut offsets = Vec::new();
506        while let Ok(Some(chunk)) = unwind.next() {
507            if let Some(col) = chunk.column(1)
508                && let Some(Value::Int64(v)) = col.get_value(0)
509            {
510                offsets.push(v);
511            }
512        }
513
514        // OFFSET is 0-based
515        assert_eq!(offsets, vec![0, 1, 2]);
516    }
517}