grafeo_core/execution/operators/
set_ops.rs1use std::collections::HashSet;
7
8use grafeo_common::types::{HashableValue, LogicalType, Value};
9
10use super::{DataChunk, Operator, OperatorError, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13type RowKey = Vec<HashableValue>;
15
16fn row_key(chunk: &DataChunk, row: usize) -> RowKey {
18 let mut key = Vec::with_capacity(chunk.num_columns());
19 for col_idx in 0..chunk.num_columns() {
20 let val = chunk
21 .column(col_idx)
22 .and_then(|col| col.get_value(row))
23 .unwrap_or(Value::Null);
24 key.push(HashableValue(val));
25 }
26 key
27}
28
29fn row_values(key: &RowKey) -> Vec<Value> {
31 key.iter().map(|hv| hv.0.clone()).collect()
32}
33
34fn materialize(op: &mut dyn Operator) -> Result<Vec<RowKey>, OperatorError> {
36 let mut rows = Vec::new();
37 while let Some(chunk) = op.next()? {
38 for row in chunk.selected_indices() {
39 rows.push(row_key(&chunk, row));
40 }
41 }
42 Ok(rows)
43}
44
45fn rows_to_chunk(rows: &[RowKey], schema: &[LogicalType]) -> DataChunk {
47 if rows.is_empty() {
48 return DataChunk::empty();
49 }
50 let mut builder = DataChunkBuilder::new(schema);
51 for row in rows {
52 let values = row_values(row);
53 for (col_idx, val) in values.into_iter().enumerate() {
54 if let Some(col) = builder.column_mut(col_idx) {
55 col.push_value(val);
56 }
57 }
58 builder.advance_row();
59 }
60 builder.finish()
61}
62
63pub struct ExceptOperator {
65 left: Box<dyn Operator>,
66 right: Box<dyn Operator>,
67 all: bool,
68 output_schema: Vec<LogicalType>,
69 result: Option<Vec<RowKey>>,
70 position: usize,
71}
72
73impl ExceptOperator {
74 pub fn new(
76 left: Box<dyn Operator>,
77 right: Box<dyn Operator>,
78 all: bool,
79 output_schema: Vec<LogicalType>,
80 ) -> Self {
81 Self {
82 left,
83 right,
84 all,
85 output_schema,
86 result: None,
87 position: 0,
88 }
89 }
90
91 fn compute(&mut self) -> Result<(), OperatorError> {
92 let left_rows = materialize(self.left.as_mut())?;
93 let right_rows = materialize(self.right.as_mut())?;
94
95 if self.all {
96 let mut result = left_rows;
98 for right_row in &right_rows {
99 if let Some(pos) = result.iter().position(|r| r == right_row) {
100 result.remove(pos);
101 }
102 }
103 self.result = Some(result);
104 } else {
105 let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
107 let mut seen = HashSet::new();
108 let result: Vec<RowKey> = left_rows
109 .into_iter()
110 .filter(|row| !right_set.contains(row) && seen.insert(row.clone()))
111 .collect();
112 self.result = Some(result);
113 }
114 Ok(())
115 }
116}
117
118impl Operator for ExceptOperator {
119 fn next(&mut self) -> OperatorResult {
120 if self.result.is_none() {
121 self.compute()?;
122 }
123 let rows = self.result.as_ref().unwrap();
124 if self.position >= rows.len() {
125 return Ok(None);
126 }
127 let end = (self.position + 1024).min(rows.len());
129 let batch = &rows[self.position..end];
130 self.position = end;
131 if batch.is_empty() {
132 Ok(None)
133 } else {
134 Ok(Some(rows_to_chunk(batch, &self.output_schema)))
135 }
136 }
137
138 fn reset(&mut self) {
139 self.left.reset();
140 self.right.reset();
141 self.result = None;
142 self.position = 0;
143 }
144
145 fn name(&self) -> &'static str {
146 "Except"
147 }
148}
149
150pub struct IntersectOperator {
152 left: Box<dyn Operator>,
153 right: Box<dyn Operator>,
154 all: bool,
155 output_schema: Vec<LogicalType>,
156 result: Option<Vec<RowKey>>,
157 position: usize,
158}
159
160impl IntersectOperator {
161 pub fn new(
163 left: Box<dyn Operator>,
164 right: Box<dyn Operator>,
165 all: bool,
166 output_schema: Vec<LogicalType>,
167 ) -> Self {
168 Self {
169 left,
170 right,
171 all,
172 output_schema,
173 result: None,
174 position: 0,
175 }
176 }
177
178 fn compute(&mut self) -> Result<(), OperatorError> {
179 let left_rows = materialize(self.left.as_mut())?;
180 let right_rows = materialize(self.right.as_mut())?;
181
182 if self.all {
183 let mut remaining_right = right_rows;
185 let mut result = Vec::new();
186 for left_row in &left_rows {
187 if let Some(pos) = remaining_right.iter().position(|r| r == left_row) {
188 result.push(left_row.clone());
189 remaining_right.remove(pos);
190 }
191 }
192 self.result = Some(result);
193 } else {
194 let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
196 let mut seen = HashSet::new();
197 let result: Vec<RowKey> = left_rows
198 .into_iter()
199 .filter(|row| right_set.contains(row) && seen.insert(row.clone()))
200 .collect();
201 self.result = Some(result);
202 }
203 Ok(())
204 }
205}
206
207impl Operator for IntersectOperator {
208 fn next(&mut self) -> OperatorResult {
209 if self.result.is_none() {
210 self.compute()?;
211 }
212 let rows = self.result.as_ref().unwrap();
213 if self.position >= rows.len() {
214 return Ok(None);
215 }
216 let end = (self.position + 1024).min(rows.len());
217 let batch = &rows[self.position..end];
218 self.position = end;
219 if batch.is_empty() {
220 Ok(None)
221 } else {
222 Ok(Some(rows_to_chunk(batch, &self.output_schema)))
223 }
224 }
225
226 fn reset(&mut self) {
227 self.left.reset();
228 self.right.reset();
229 self.result = None;
230 self.position = 0;
231 }
232
233 fn name(&self) -> &'static str {
234 "Intersect"
235 }
236}
237
238pub struct OtherwiseOperator {
240 left: Box<dyn Operator>,
241 right: Box<dyn Operator>,
242 state: OtherwiseState,
244}
245
246enum OtherwiseState {
247 Init,
249 StreamingLeft(Option<DataChunk>),
251 StreamingRight,
253 Done,
255}
256
257impl OtherwiseOperator {
258 pub fn new(left: Box<dyn Operator>, right: Box<dyn Operator>) -> Self {
260 Self {
261 left,
262 right,
263 state: OtherwiseState::Init,
264 }
265 }
266}
267
268impl Operator for OtherwiseOperator {
269 fn next(&mut self) -> OperatorResult {
270 loop {
271 match &mut self.state {
272 OtherwiseState::Init => {
273 if let Some(chunk) = self.left.next()? {
275 self.state = OtherwiseState::StreamingLeft(Some(chunk));
276 } else {
277 self.state = OtherwiseState::StreamingRight;
279 }
280 }
281 OtherwiseState::StreamingLeft(buffered) => {
282 if let Some(chunk) = buffered.take() {
283 return Ok(Some(chunk));
284 }
285 match self.left.next()? {
287 Some(chunk) => return Ok(Some(chunk)),
288 None => {
289 self.state = OtherwiseState::Done;
290 return Ok(None);
291 }
292 }
293 }
294 OtherwiseState::StreamingRight => match self.right.next()? {
295 Some(chunk) => return Ok(Some(chunk)),
296 None => {
297 self.state = OtherwiseState::Done;
298 return Ok(None);
299 }
300 },
301 OtherwiseState::Done => return Ok(None),
302 }
303 }
304 }
305
306 fn reset(&mut self) {
307 self.left.reset();
308 self.right.reset();
309 self.state = OtherwiseState::Init;
310 }
311
312 fn name(&self) -> &'static str {
313 "Otherwise"
314 }
315}