grafeo_core/execution/operators/
set_ops.rs1use std::collections::HashSet;
7
8use grafeo_common::types::{HashableValue, LogicalType, Value};
9
10use super::{DataChunk, Operator, OperatorError, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13type RowKey = Vec<HashableValue>;
15
16fn row_key(chunk: &DataChunk, row: usize) -> RowKey {
18 let mut key = Vec::with_capacity(chunk.num_columns());
19 for col_idx in 0..chunk.num_columns() {
20 let val = chunk
21 .column(col_idx)
22 .and_then(|col| col.get_value(row))
23 .unwrap_or(Value::Null);
24 key.push(HashableValue(val));
25 }
26 key
27}
28
29fn row_values(key: &RowKey) -> Vec<Value> {
31 key.iter().map(|hv| hv.0.clone()).collect()
32}
33
34fn materialize(op: &mut dyn Operator) -> Result<Vec<RowKey>, OperatorError> {
36 let mut rows = Vec::new();
37 while let Some(chunk) = op.next()? {
38 for row in chunk.selected_indices() {
39 rows.push(row_key(&chunk, row));
40 }
41 }
42 Ok(rows)
43}
44
45fn rows_to_chunk(rows: &[RowKey], schema: &[LogicalType]) -> DataChunk {
47 if rows.is_empty() {
48 return DataChunk::empty();
49 }
50 let mut builder = DataChunkBuilder::new(schema);
51 for row in rows {
52 let values = row_values(row);
53 for (col_idx, val) in values.into_iter().enumerate() {
54 if let Some(col) = builder.column_mut(col_idx) {
55 col.push_value(val);
56 }
57 }
58 builder.advance_row();
59 }
60 builder.finish()
61}
62
63pub struct ExceptOperator {
65 left: Box<dyn Operator>,
66 right: Box<dyn Operator>,
67 all: bool,
68 output_schema: Vec<LogicalType>,
69 result: Option<Vec<RowKey>>,
70 position: usize,
71}
72
73impl ExceptOperator {
74 pub fn new(
76 left: Box<dyn Operator>,
77 right: Box<dyn Operator>,
78 all: bool,
79 output_schema: Vec<LogicalType>,
80 ) -> Self {
81 Self {
82 left,
83 right,
84 all,
85 output_schema,
86 result: None,
87 position: 0,
88 }
89 }
90
91 fn compute(&mut self) -> Result<(), OperatorError> {
92 let left_rows = materialize(self.left.as_mut())?;
93 let right_rows = materialize(self.right.as_mut())?;
94
95 if self.all {
96 let mut result = left_rows;
98 for right_row in &right_rows {
99 if let Some(pos) = result.iter().position(|r| r == right_row) {
100 result.remove(pos);
101 }
102 }
103 self.result = Some(result);
104 } else {
105 let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
107 let mut seen = HashSet::new();
108 let result: Vec<RowKey> = left_rows
109 .into_iter()
110 .filter(|row| !right_set.contains(row) && seen.insert(row.clone()))
111 .collect();
112 self.result = Some(result);
113 }
114 Ok(())
115 }
116}
117
118impl Operator for ExceptOperator {
119 fn next(&mut self) -> OperatorResult {
120 if self.result.is_none() {
121 self.compute()?;
122 }
123 let rows = self
124 .result
125 .as_ref()
126 .expect("result is Some: compute() called above");
127 if self.position >= rows.len() {
128 return Ok(None);
129 }
130 let end = (self.position + 1024).min(rows.len());
132 let batch = &rows[self.position..end];
133 self.position = end;
134 if batch.is_empty() {
135 Ok(None)
136 } else {
137 Ok(Some(rows_to_chunk(batch, &self.output_schema)))
138 }
139 }
140
141 fn reset(&mut self) {
142 self.left.reset();
143 self.right.reset();
144 self.result = None;
145 self.position = 0;
146 }
147
148 fn name(&self) -> &'static str {
149 "Except"
150 }
151}
152
153pub struct IntersectOperator {
155 left: Box<dyn Operator>,
156 right: Box<dyn Operator>,
157 all: bool,
158 output_schema: Vec<LogicalType>,
159 result: Option<Vec<RowKey>>,
160 position: usize,
161}
162
163impl IntersectOperator {
164 pub fn new(
166 left: Box<dyn Operator>,
167 right: Box<dyn Operator>,
168 all: bool,
169 output_schema: Vec<LogicalType>,
170 ) -> Self {
171 Self {
172 left,
173 right,
174 all,
175 output_schema,
176 result: None,
177 position: 0,
178 }
179 }
180
181 fn compute(&mut self) -> Result<(), OperatorError> {
182 let left_rows = materialize(self.left.as_mut())?;
183 let right_rows = materialize(self.right.as_mut())?;
184
185 if self.all {
186 let mut remaining_right = right_rows;
188 let mut result = Vec::new();
189 for left_row in &left_rows {
190 if let Some(pos) = remaining_right.iter().position(|r| r == left_row) {
191 result.push(left_row.clone());
192 remaining_right.remove(pos);
193 }
194 }
195 self.result = Some(result);
196 } else {
197 let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
199 let mut seen = HashSet::new();
200 let result: Vec<RowKey> = left_rows
201 .into_iter()
202 .filter(|row| right_set.contains(row) && seen.insert(row.clone()))
203 .collect();
204 self.result = Some(result);
205 }
206 Ok(())
207 }
208}
209
210impl Operator for IntersectOperator {
211 fn next(&mut self) -> OperatorResult {
212 if self.result.is_none() {
213 self.compute()?;
214 }
215 let rows = self
216 .result
217 .as_ref()
218 .expect("result is Some: compute() called above");
219 if self.position >= rows.len() {
220 return Ok(None);
221 }
222 let end = (self.position + 1024).min(rows.len());
223 let batch = &rows[self.position..end];
224 self.position = end;
225 if batch.is_empty() {
226 Ok(None)
227 } else {
228 Ok(Some(rows_to_chunk(batch, &self.output_schema)))
229 }
230 }
231
232 fn reset(&mut self) {
233 self.left.reset();
234 self.right.reset();
235 self.result = None;
236 self.position = 0;
237 }
238
239 fn name(&self) -> &'static str {
240 "Intersect"
241 }
242}
243
244pub struct OtherwiseOperator {
246 left: Box<dyn Operator>,
247 right: Box<dyn Operator>,
248 state: OtherwiseState,
250}
251
252enum OtherwiseState {
253 Init,
255 StreamingLeft(Option<DataChunk>),
257 StreamingRight,
259 Done,
261}
262
263impl OtherwiseOperator {
264 pub fn new(left: Box<dyn Operator>, right: Box<dyn Operator>) -> Self {
266 Self {
267 left,
268 right,
269 state: OtherwiseState::Init,
270 }
271 }
272}
273
274impl Operator for OtherwiseOperator {
275 fn next(&mut self) -> OperatorResult {
276 loop {
277 match &mut self.state {
278 OtherwiseState::Init => {
279 if let Some(chunk) = self.left.next()? {
281 self.state = OtherwiseState::StreamingLeft(Some(chunk));
282 } else {
283 self.state = OtherwiseState::StreamingRight;
285 }
286 }
287 OtherwiseState::StreamingLeft(buffered) => {
288 if let Some(chunk) = buffered.take() {
289 return Ok(Some(chunk));
290 }
291 match self.left.next()? {
293 Some(chunk) => return Ok(Some(chunk)),
294 None => {
295 self.state = OtherwiseState::Done;
296 return Ok(None);
297 }
298 }
299 }
300 OtherwiseState::StreamingRight => match self.right.next()? {
301 Some(chunk) => return Ok(Some(chunk)),
302 None => {
303 self.state = OtherwiseState::Done;
304 return Ok(None);
305 }
306 },
307 OtherwiseState::Done => return Ok(None),
308 }
309 }
310 }
311
312 fn reset(&mut self) {
313 self.left.reset();
314 self.right.reset();
315 self.state = OtherwiseState::Init;
316 }
317
318 fn name(&self) -> &'static str {
319 "Otherwise"
320 }
321}