Skip to main content

graphos_core/execution/operators/
union.rs

1//! Union operator for combining multiple result sets.
2//!
3//! The union operator concatenates results from multiple input operators,
4//! producing all rows from each input in sequence.
5
6use graphos_common::types::LogicalType;
7
8use super::{Operator, OperatorResult};
9
10/// Union operator that combines results from multiple inputs.
11///
12/// This produces all rows from all inputs, in order. It does not
13/// remove duplicates (use DISTINCT after UNION for UNION DISTINCT).
14pub struct UnionOperator {
15    /// Input operators.
16    inputs: Vec<Box<dyn Operator>>,
17    /// Current input index.
18    current_input: usize,
19    /// Output schema.
20    output_schema: Vec<LogicalType>,
21}
22
23impl UnionOperator {
24    /// Creates a new union operator.
25    ///
26    /// # Arguments
27    /// * `inputs` - The input operators to union.
28    /// * `output_schema` - The schema of the output (should match all inputs).
29    pub fn new(inputs: Vec<Box<dyn Operator>>, output_schema: Vec<LogicalType>) -> Self {
30        Self {
31            inputs,
32            current_input: 0,
33            output_schema,
34        }
35    }
36
37    /// Returns the output schema.
38    #[must_use]
39    pub fn output_schema(&self) -> &[LogicalType] {
40        &self.output_schema
41    }
42}
43
44impl Operator for UnionOperator {
45    fn next(&mut self) -> OperatorResult {
46        // Process inputs in order
47        while self.current_input < self.inputs.len() {
48            if let Some(chunk) = self.inputs[self.current_input].next()? {
49                return Ok(Some(chunk));
50            }
51            // Move to next input when current is exhausted
52            self.current_input += 1;
53        }
54
55        Ok(None)
56    }
57
58    fn reset(&mut self) {
59        for input in &mut self.inputs {
60            input.reset();
61        }
62        self.current_input = 0;
63    }
64
65    fn name(&self) -> &'static str {
66        "Union"
67    }
68}
69
70/// Union all operator that includes duplicates.
71///
72/// This is the same as UnionOperator - kept for semantic clarity.
73#[allow(dead_code)]
74pub type UnionAllOperator = UnionOperator;
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use crate::execution::DataChunk;
80    use crate::execution::chunk::DataChunkBuilder;
81
82    /// Mock operator for testing.
83    struct MockOperator {
84        chunks: Vec<DataChunk>,
85        position: usize,
86    }
87
88    impl MockOperator {
89        fn new(chunks: Vec<DataChunk>) -> Self {
90            Self {
91                chunks,
92                position: 0,
93            }
94        }
95    }
96
97    impl Operator for MockOperator {
98        fn next(&mut self) -> OperatorResult {
99            if self.position < self.chunks.len() {
100                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
101                self.position += 1;
102                Ok(Some(chunk))
103            } else {
104                Ok(None)
105            }
106        }
107
108        fn reset(&mut self) {
109            self.position = 0;
110        }
111
112        fn name(&self) -> &'static str {
113            "Mock"
114        }
115    }
116
117    fn create_int_chunk(values: &[i64]) -> DataChunk {
118        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
119        for &v in values {
120            builder.column_mut(0).unwrap().push_int64(v);
121            builder.advance_row();
122        }
123        builder.finish()
124    }
125
126    #[test]
127    fn test_union_two_inputs() {
128        let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
129        let input2 = MockOperator::new(vec![create_int_chunk(&[3, 4])]);
130
131        let mut union = UnionOperator::new(
132            vec![Box::new(input1), Box::new(input2)],
133            vec![LogicalType::Int64],
134        );
135
136        let mut results = Vec::new();
137        while let Some(chunk) = union.next().unwrap() {
138            for row in chunk.selected_indices() {
139                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
140                results.push(val);
141            }
142        }
143
144        assert_eq!(results, vec![1, 2, 3, 4]);
145    }
146
147    #[test]
148    fn test_union_three_inputs() {
149        let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
150        let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
151        let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
152
153        let mut union = UnionOperator::new(
154            vec![Box::new(input1), Box::new(input2), Box::new(input3)],
155            vec![LogicalType::Int64],
156        );
157
158        let mut results = Vec::new();
159        while let Some(chunk) = union.next().unwrap() {
160            for row in chunk.selected_indices() {
161                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
162                results.push(val);
163            }
164        }
165
166        assert_eq!(results, vec![1, 2, 3]);
167    }
168
169    #[test]
170    fn test_union_empty_input() {
171        let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
172        let input2 = MockOperator::new(vec![]); // Empty
173        let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
174
175        let mut union = UnionOperator::new(
176            vec![Box::new(input1), Box::new(input2), Box::new(input3)],
177            vec![LogicalType::Int64],
178        );
179
180        let mut results = Vec::new();
181        while let Some(chunk) = union.next().unwrap() {
182            for row in chunk.selected_indices() {
183                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
184                results.push(val);
185            }
186        }
187
188        assert_eq!(results, vec![1, 2, 3]);
189    }
190
191    #[test]
192    fn test_union_reset() {
193        let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
194        let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
195
196        let mut union = UnionOperator::new(
197            vec![Box::new(input1), Box::new(input2)],
198            vec![LogicalType::Int64],
199        );
200
201        // First pass
202        let mut count = 0;
203        while union.next().unwrap().is_some() {
204            count += 1;
205        }
206        assert_eq!(count, 2);
207
208        // Reset and second pass
209        union.reset();
210        count = 0;
211        while union.next().unwrap().is_some() {
212            count += 1;
213        }
214        assert_eq!(count, 2);
215    }
216}