grafeo_core/execution/operators/
union.rs1use grafeo_common::types::LogicalType;
7
8use super::{Operator, OperatorResult};
9
10pub struct UnionOperator {
15 inputs: Vec<Box<dyn Operator>>,
17 current_input: usize,
19 output_schema: Vec<LogicalType>,
21}
22
23impl UnionOperator {
24 pub fn new(inputs: Vec<Box<dyn Operator>>, output_schema: Vec<LogicalType>) -> Self {
30 Self {
31 inputs,
32 current_input: 0,
33 output_schema,
34 }
35 }
36
37 #[must_use]
39 pub fn output_schema(&self) -> &[LogicalType] {
40 &self.output_schema
41 }
42}
43
44impl Operator for UnionOperator {
45 fn next(&mut self) -> OperatorResult {
46 while self.current_input < self.inputs.len() {
48 if let Some(chunk) = self.inputs[self.current_input].next()? {
49 return Ok(Some(chunk));
50 }
51 self.current_input += 1;
53 }
54
55 Ok(None)
56 }
57
58 fn reset(&mut self) {
59 for input in &mut self.inputs {
60 input.reset();
61 }
62 self.current_input = 0;
63 }
64
65 fn name(&self) -> &'static str {
66 "Union"
67 }
68
69 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
70 self
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use crate::execution::DataChunk;
78 use crate::execution::chunk::DataChunkBuilder;
79
80 struct MockOperator {
82 chunks: Vec<DataChunk>,
83 position: usize,
84 }
85
86 impl MockOperator {
87 fn new(chunks: Vec<DataChunk>) -> Self {
88 Self {
89 chunks,
90 position: 0,
91 }
92 }
93 }
94
95 impl Operator for MockOperator {
96 fn next(&mut self) -> OperatorResult {
97 if self.position < self.chunks.len() {
98 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
99 self.position += 1;
100 Ok(Some(chunk))
101 } else {
102 Ok(None)
103 }
104 }
105
106 fn reset(&mut self) {
107 self.position = 0;
108 }
109
110 fn name(&self) -> &'static str {
111 "Mock"
112 }
113
114 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
115 self
116 }
117 }
118
119 fn create_int_chunk(values: &[i64]) -> DataChunk {
120 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
121 for &v in values {
122 builder.column_mut(0).unwrap().push_int64(v);
123 builder.advance_row();
124 }
125 builder.finish()
126 }
127
128 #[test]
129 fn test_union_two_inputs() {
130 let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
131 let input2 = MockOperator::new(vec![create_int_chunk(&[3, 4])]);
132
133 let mut union = UnionOperator::new(
134 vec![Box::new(input1), Box::new(input2)],
135 vec![LogicalType::Int64],
136 );
137
138 let mut results = Vec::new();
139 while let Some(chunk) = union.next().unwrap() {
140 for row in chunk.selected_indices() {
141 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
142 results.push(val);
143 }
144 }
145
146 assert_eq!(results, vec![1, 2, 3, 4]);
147 }
148
149 #[test]
150 fn test_union_three_inputs() {
151 let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
152 let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
153 let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
154
155 let mut union = UnionOperator::new(
156 vec![Box::new(input1), Box::new(input2), Box::new(input3)],
157 vec![LogicalType::Int64],
158 );
159
160 let mut results = Vec::new();
161 while let Some(chunk) = union.next().unwrap() {
162 for row in chunk.selected_indices() {
163 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
164 results.push(val);
165 }
166 }
167
168 assert_eq!(results, vec![1, 2, 3]);
169 }
170
171 #[test]
172 fn test_union_empty_input() {
173 let input1 = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
174 let input2 = MockOperator::new(vec![]); let input3 = MockOperator::new(vec![create_int_chunk(&[3])]);
176
177 let mut union = UnionOperator::new(
178 vec![Box::new(input1), Box::new(input2), Box::new(input3)],
179 vec![LogicalType::Int64],
180 );
181
182 let mut results = Vec::new();
183 while let Some(chunk) = union.next().unwrap() {
184 for row in chunk.selected_indices() {
185 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
186 results.push(val);
187 }
188 }
189
190 assert_eq!(results, vec![1, 2, 3]);
191 }
192
193 #[test]
194 fn test_union_reset() {
195 let input1 = MockOperator::new(vec![create_int_chunk(&[1])]);
196 let input2 = MockOperator::new(vec![create_int_chunk(&[2])]);
197
198 let mut union = UnionOperator::new(
199 vec![Box::new(input1), Box::new(input2)],
200 vec![LogicalType::Int64],
201 );
202
203 let mut count = 0;
205 while union.next().unwrap().is_some() {
206 count += 1;
207 }
208 assert_eq!(count, 2);
209
210 union.reset();
212 count = 0;
213 while union.next().unwrap().is_some() {
214 count += 1;
215 }
216 assert_eq!(count, 2);
217 }
218
219 #[test]
220 fn test_union_into_any() {
221 let left = MockOperator::new(vec![]);
222 let right = MockOperator::new(vec![]);
223 let op = UnionOperator::new(
224 vec![Box::new(left), Box::new(right)],
225 vec![LogicalType::Int64],
226 );
227 let any = Box::new(op).into_any();
228 assert!(any.downcast::<UnionOperator>().is_ok());
229 }
230}