Skip to main content

grafeo_core/execution/operators/
union.rs

1//! Union operator for combining multiple result sets.
2//!
3//! The union operator concatenates results from multiple input operators,
4//! producing all rows from each input in sequence.
5
6use grafeo_common::types::LogicalType;
7
8use super::{Operator, OperatorResult};
9
10/// Union operator that combines results from multiple inputs.
11///
12/// This produces all rows from all inputs, in order. It does not
13/// remove duplicates (use DISTINCT after UNION for UNION DISTINCT).
14pub struct UnionOperator {
15    /// Input operators.
16    inputs: Vec<Box<dyn Operator>>,
17    /// Current input index.
18    current_input: usize,
19    /// Output schema.
20    output_schema: Vec<LogicalType>,
21}
22
23impl UnionOperator {
24    /// Creates a new union operator.
25    ///
26    /// # Arguments
27    /// * `inputs` - The input operators to union.
28    /// * `output_schema` - The schema of the output (should match all inputs).
29    pub fn new(inputs: Vec<Box<dyn Operator>>, output_schema: Vec<LogicalType>) -> Self {
30        Self {
31            inputs,
32            current_input: 0,
33            output_schema,
34        }
35    }
36
37    /// Returns the output schema.
38    #[must_use]
39    pub fn output_schema(&self) -> &[LogicalType] {
40        &self.output_schema
41    }
42}
43
44impl Operator for UnionOperator {
45    fn next(&mut self) -> OperatorResult {
46        // Process inputs in order
47        while self.current_input < self.inputs.len() {
48            if let Some(chunk) = self.inputs[self.current_input].next()? {
49                return Ok(Some(chunk));
50            }
51            // Move to next input when current is exhausted
52            self.current_input += 1;
53        }
54
55        Ok(None)
56    }
57
58    fn reset(&mut self) {
59        for input in &mut self.inputs {
60            input.reset();
61        }
62        self.current_input = 0;
63    }
64
65    fn name(&self) -> &'static str {
66        "Union"
67    }
68
69    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
70        self
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use crate::execution::DataChunk;
78    use crate::execution::chunk::DataChunkBuilder;
79
80    /// Mock operator for testing.
81    struct MockOperator {
82        chunks: Vec<DataChunk>,
83        position: usize,
84    }
85
86    impl MockOperator {
87        fn new(chunks: Vec<DataChunk>) -> Self {
88            Self {
89                chunks,
90                position: 0,
91            }
92        }
93    }
94
95    impl Operator for MockOperator {
96        fn next(&mut self) -> OperatorResult {
97            if self.position < self.chunks.len() {
98                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
99                self.position += 1;
100                Ok(Some(chunk))
101            } else {
102                Ok(None)
103            }
104        }
105
106        fn reset(&mut self) {
107            self.position = 0;
108        }
109
110        fn name(&self) -> &'static str {
111            "Mock"
112        }
113
114        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
115            self
116        }
117    }
118
119    fn create_int_chunk(values: &[i64]) -> DataChunk {
120        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
121        for &v in values {
122            builder.column_mut(0).unwrap().push_int64(v);
123            builder.advance_row();
124        }
125        builder.finish()
126    }
127
128    #[test]
129    fn test_union_two_inputs() {
130        let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
131        let input2 = MockOperator::new(vec![create_int_chunk(&[3, 4])]);
132
133        let mut union = UnionOperator::new(
134            vec![Box::new(input1), Box::new(input2)],
135            vec![LogicalType::Int64],
136        );
137
138        let mut results = Vec::new();
139        while let Some(chunk) = union.next().unwrap() {
140            for row in chunk.selected_indices() {
141                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
142                results.push(val);
143            }
144        }
145
146        assert_eq!(results, vec![1, 2, 3, 4]);
147    }
148
149    #[test]
150    fn test_union_three_inputs() {
151        let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
152        let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
153        let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
154
155        let mut union = UnionOperator::new(
156            vec![Box::new(input1), Box::new(input2), Box::new(input3)],
157            vec![LogicalType::Int64],
158        );
159
160        let mut results = Vec::new();
161        while let Some(chunk) = union.next().unwrap() {
162            for row in chunk.selected_indices() {
163                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
164                results.push(val);
165            }
166        }
167
168        assert_eq!(results, vec![1, 2, 3]);
169    }
170
171    #[test]
172    fn test_union_empty_input() {
173        let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
174        let input2 = MockOperator::new(vec![]); // Empty
175        let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
176
177        let mut union = UnionOperator::new(
178            vec![Box::new(input1), Box::new(input2), Box::new(input3)],
179            vec![LogicalType::Int64],
180        );
181
182        let mut results = Vec::new();
183        while let Some(chunk) = union.next().unwrap() {
184            for row in chunk.selected_indices() {
185                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
186                results.push(val);
187            }
188        }
189
190        assert_eq!(results, vec![1, 2, 3]);
191    }
192
193    #[test]
194    fn test_union_reset() {
195        let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
196        let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
197
198        let mut union = UnionOperator::new(
199            vec![Box::new(input1), Box::new(input2)],
200            vec![LogicalType::Int64],
201        );
202
203        // First pass
204        let mut count = 0;
205        while union.next().unwrap().is_some() {
206            count += 1;
207        }
208        assert_eq!(count, 2);
209
210        // Reset and second pass
211        union.reset();
212        count = 0;
213        while union.next().unwrap().is_some() {
214            count += 1;
215        }
216        assert_eq!(count, 2);
217    }
218
219    #[test]
220    fn test_union_into_any() {
221        let left = MockOperator::new(vec![]);
222        let right = MockOperator::new(vec![]);
223        let op = UnionOperator::new(
224            vec![Box::new(left), Box::new(right)],
225            vec![LogicalType::Int64],
226        );
227        let any = Box::new(op).into_any();
228        assert!(any.downcast::<UnionOperator>().is_ok());
229    }
230}