Skip to main content

graphos_core/execution/operators/
unwind.rs

1//! Unwind operator for expanding lists into individual rows.
2
3use super::{Operator, OperatorResult};
4use crate::execution::chunk::{DataChunk, DataChunkBuilder};
5use graphos_common::types::{LogicalType, Value};
6
7/// Unwind operator that expands a list column into individual rows.
8///
9/// For each input row, if the list column contains N elements, this operator
10/// produces N output rows, each with one element from the list.
11pub struct UnwindOperator {
12    /// Child operator to read from.
13    child: Box<dyn Operator>,
14    /// Index of the column containing the list to unwind.
15    list_col_idx: usize,
16    /// Name of the new variable for the unwound elements.
17    variable_name: String,
18    /// Schema of output columns (inherited from input plus the new column).
19    output_schema: Vec<LogicalType>,
20    /// Current input chunk being processed.
21    current_chunk: Option<DataChunk>,
22    /// Current row index within the chunk.
23    current_row: usize,
24    /// Current index within the list being unwound.
25    current_list_idx: usize,
26    /// Current list being unwound.
27    current_list: Option<Vec<Value>>,
28}
29
30impl UnwindOperator {
31    /// Creates a new unwind operator.
32    ///
33    /// # Arguments
34    /// * `child` - The input operator
35    /// * `list_col_idx` - The column index containing the list to unwind
36    /// * `variable_name` - The name of the new variable
37    /// * `output_schema` - The schema for output (should include the new column type)
38    pub fn new(
39        child: Box<dyn Operator>,
40        list_col_idx: usize,
41        variable_name: String,
42        output_schema: Vec<LogicalType>,
43    ) -> Self {
44        Self {
45            child,
46            list_col_idx,
47            variable_name,
48            output_schema,
49            current_chunk: None,
50            current_row: 0,
51            current_list_idx: 0,
52            current_list: None,
53        }
54    }
55
56    /// Returns the variable name for the unwound elements.
57    #[must_use]
58    pub fn variable_name(&self) -> &str {
59        &self.variable_name
60    }
61
62    /// Advances to the next row or fetches the next chunk.
63    fn advance(&mut self) -> OperatorResult {
64        loop {
65            // If we have a current list, try to get the next element
66            if let Some(list) = &self.current_list {
67                if self.current_list_idx < list.len() {
68                    // Still have elements in the current list
69                    return Ok(Some(self.emit_row()?));
70                }
71            }
72
73            // Need to move to the next row
74            self.current_list_idx = 0;
75            self.current_list = None;
76
77            // Get the next chunk if needed
78            if self.current_chunk.is_none() {
79                self.current_chunk = self.child.next()?;
80                self.current_row = 0;
81                if self.current_chunk.is_none() {
82                    return Ok(None); // No more data
83                }
84            }
85
86            let chunk = self.current_chunk.as_ref().unwrap();
87
88            // Find the next row with a list value
89            while self.current_row < chunk.row_count() {
90                if let Some(col) = chunk.column(self.list_col_idx) {
91                    if let Some(value) = col.get_value(self.current_row) {
92                        if let Value::List(list_arc) = value {
93                            // Found a list - store it and return first element
94                            let list: Vec<Value> = list_arc.iter().cloned().collect();
95                            if !list.is_empty() {
96                                self.current_list = Some(list);
97                                return Ok(Some(self.emit_row()?));
98                            }
99                        }
100                    }
101                }
102                self.current_row += 1;
103            }
104
105            // Exhausted current chunk, get next one
106            self.current_chunk = None;
107        }
108    }
109
110    /// Emits a single row with the current list element.
111    fn emit_row(&mut self) -> Result<DataChunk, super::OperatorError> {
112        let chunk = self.current_chunk.as_ref().unwrap();
113        let list = self.current_list.as_ref().unwrap();
114        let element = list[self.current_list_idx].clone();
115
116        // Build output row: copy all columns from input + add the unwound element
117        let mut builder = DataChunkBuilder::new(&self.output_schema);
118
119        // Copy existing columns (except the list column which we're replacing)
120        for col_idx in 0..chunk.column_count() {
121            if col_idx == self.list_col_idx {
122                continue; // Skip the list column
123            }
124            if let Some(col) = chunk.column(col_idx) {
125                if let Some(value) = col.get_value(self.current_row) {
126                    if let Some(out_col) = builder.column_mut(col_idx) {
127                        out_col.push_value(value);
128                    }
129                }
130            }
131        }
132
133        // Add the unwound element as the last column
134        let new_col_idx = self.output_schema.len() - 1;
135        if let Some(out_col) = builder.column_mut(new_col_idx) {
136            out_col.push_value(element);
137        }
138
139        builder.advance_row();
140        self.current_list_idx += 1;
141
142        // If we've exhausted this list, move to the next row
143        if self.current_list_idx >= list.len() {
144            self.current_row += 1;
145        }
146
147        Ok(builder.finish())
148    }
149}
150
151impl Operator for UnwindOperator {
152    fn next(&mut self) -> OperatorResult {
153        self.advance()
154    }
155
156    fn reset(&mut self) {
157        self.child.reset();
158        self.current_chunk = None;
159        self.current_row = 0;
160        self.current_list_idx = 0;
161        self.current_list = None;
162    }
163
164    fn name(&self) -> &'static str {
165        "Unwind"
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::execution::chunk::DataChunkBuilder;
173    use std::sync::Arc;
174
175    struct MockOperator {
176        chunks: Vec<DataChunk>,
177        position: usize,
178    }
179
180    impl Operator for MockOperator {
181        fn next(&mut self) -> OperatorResult {
182            if self.position < self.chunks.len() {
183                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
184                self.position += 1;
185                Ok(Some(chunk))
186            } else {
187                Ok(None)
188            }
189        }
190
191        fn reset(&mut self) {
192            self.position = 0;
193        }
194
195        fn name(&self) -> &'static str {
196            "MockOperator"
197        }
198    }
199
200    #[test]
201    fn test_unwind_basic() {
202        // Create a chunk with a list column [1, 2, 3]
203        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]); // Any for list
204        let list = Value::List(Arc::new([
205            Value::Int64(1),
206            Value::Int64(2),
207            Value::Int64(3),
208        ]));
209        builder.column_mut(0).unwrap().push_value(list);
210        builder.advance_row();
211        let chunk = builder.finish();
212
213        let mock = MockOperator {
214            chunks: vec![chunk],
215            position: 0,
216        };
217
218        // Create unwind operator
219        let mut unwind = UnwindOperator::new(
220            Box::new(mock),
221            0,
222            "x".to_string(),
223            vec![LogicalType::Int64], // Output is just the unwound element
224        );
225
226        // Should produce 3 rows
227        let mut results = Vec::new();
228        while let Ok(Some(chunk)) = unwind.next() {
229            results.push(chunk);
230        }
231
232        assert_eq!(results.len(), 3);
233    }
234}