grafeo_core/execution/operators/
union.rs1use grafeo_common::types::LogicalType;
7
8use super::{Operator, OperatorResult};
9
10pub struct UnionOperator {
15 inputs: Vec<Box<dyn Operator>>,
17 current_input: usize,
19 output_schema: Vec<LogicalType>,
21}
22
23impl UnionOperator {
24 pub fn new(inputs: Vec<Box<dyn Operator>>, output_schema: Vec<LogicalType>) -> Self {
30 Self {
31 inputs,
32 current_input: 0,
33 output_schema,
34 }
35 }
36
37 #[must_use]
39 pub fn output_schema(&self) -> &[LogicalType] {
40 &self.output_schema
41 }
42}
43
44impl Operator for UnionOperator {
45 fn next(&mut self) -> OperatorResult {
46 while self.current_input < self.inputs.len() {
48 if let Some(chunk) = self.inputs[self.current_input].next()? {
49 return Ok(Some(chunk));
50 }
51 self.current_input += 1;
53 }
54
55 Ok(None)
56 }
57
58 fn reset(&mut self) {
59 for input in &mut self.inputs {
60 input.reset();
61 }
62 self.current_input = 0;
63 }
64
65 fn name(&self) -> &'static str {
66 "Union"
67 }
68}
69
70#[allow(dead_code)]
74pub type UnionAllOperator = UnionOperator;
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use crate::execution::DataChunk;
80 use crate::execution::chunk::DataChunkBuilder;
81
82 struct MockOperator {
84 chunks: Vec<DataChunk>,
85 position: usize,
86 }
87
88 impl MockOperator {
89 fn new(chunks: Vec<DataChunk>) -> Self {
90 Self {
91 chunks,
92 position: 0,
93 }
94 }
95 }
96
97 impl Operator for MockOperator {
98 fn next(&mut self) -> OperatorResult {
99 if self.position < self.chunks.len() {
100 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
101 self.position += 1;
102 Ok(Some(chunk))
103 } else {
104 Ok(None)
105 }
106 }
107
108 fn reset(&mut self) {
109 self.position = 0;
110 }
111
112 fn name(&self) -> &'static str {
113 "Mock"
114 }
115 }
116
117 fn create_int_chunk(values: &[i64]) -> DataChunk {
118 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
119 for &v in values {
120 builder.column_mut(0).unwrap().push_int64(v);
121 builder.advance_row();
122 }
123 builder.finish()
124 }
125
126 #[test]
127 fn test_union_two_inputs() {
128 let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
129 let input2 = MockOperator::new(vec![create_int_chunk(&[3, 4])]);
130
131 let mut union = UnionOperator::new(
132 vec![Box::new(input1), Box::new(input2)],
133 vec![LogicalType::Int64],
134 );
135
136 let mut results = Vec::new();
137 while let Some(chunk) = union.next().unwrap() {
138 for row in chunk.selected_indices() {
139 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
140 results.push(val);
141 }
142 }
143
144 assert_eq!(results, vec![1, 2, 3, 4]);
145 }
146
147 #[test]
148 fn test_union_three_inputs() {
149 let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
150 let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
151 let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
152
153 let mut union = UnionOperator::new(
154 vec![Box::new(input1), Box::new(input2), Box::new(input3)],
155 vec![LogicalType::Int64],
156 );
157
158 let mut results = Vec::new();
159 while let Some(chunk) = union.next().unwrap() {
160 for row in chunk.selected_indices() {
161 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
162 results.push(val);
163 }
164 }
165
166 assert_eq!(results, vec![1, 2, 3]);
167 }
168
169 #[test]
170 fn test_union_empty_input() {
171 let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
172 let input2 = MockOperator::new(vec![]); let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
174
175 let mut union = UnionOperator::new(
176 vec![Box::new(input1), Box::new(input2), Box::new(input3)],
177 vec![LogicalType::Int64],
178 );
179
180 let mut results = Vec::new();
181 while let Some(chunk) = union.next().unwrap() {
182 for row in chunk.selected_indices() {
183 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
184 results.push(val);
185 }
186 }
187
188 assert_eq!(results, vec![1, 2, 3]);
189 }
190
191 #[test]
192 fn test_union_reset() {
193 let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
194 let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
195
196 let mut union = UnionOperator::new(
197 vec![Box::new(input1), Box::new(input2)],
198 vec![LogicalType::Int64],
199 );
200
201 let mut count = 0;
203 while union.next().unwrap().is_some() {
204 count += 1;
205 }
206 assert_eq!(count, 2);
207
208 union.reset();
210 count = 0;
211 while union.next().unwrap().is_some() {
212 count += 1;
213 }
214 assert_eq!(count, 2);
215 }
216}