1use super::chunk::DataChunk;
11use super::operators::push::FilterPredicate;
12use super::operators::{
13 AggregatePushOperator, DistinctPushOperator, FilterPushOperator, LimitPushOperator,
14 SortPushOperator,
15};
16use super::operators::{
17 DistinctOperator, FilterOperator, HashAggregateOperator, LimitOperator, Operator, Predicate,
18 SortOperator,
19};
20use super::pipeline::PushOperator;
21
22pub struct PredicateAdapter(pub Box<dyn Predicate>);
28
29impl FilterPredicate for PredicateAdapter {
30 fn evaluate(&self, chunk: &DataChunk, row: usize) -> bool {
31 self.0.evaluate(chunk, row)
32 }
33}
34
35fn convert_sort_key(pull: &super::operators::SortKey) -> super::operators::push::SortKey {
44 use super::operators::{NullOrder, SortDirection};
45 super::operators::push::SortKey {
46 column: pull.column,
47 direction: match pull.direction {
48 SortDirection::Ascending => super::operators::push::SortDirection::Ascending,
49 SortDirection::Descending => super::operators::push::SortDirection::Descending,
50 },
51 null_order: match pull.null_order {
52 NullOrder::NullsFirst => super::operators::push::NullOrder::First,
53 NullOrder::NullsLast => super::operators::push::NullOrder::Last,
54 },
55 }
56}
57
58pub fn convert_to_pipeline(
75 root: Box<dyn Operator>,
76) -> (Box<dyn Operator>, Vec<Box<dyn PushOperator>>) {
77 let mut push_ops: Vec<Box<dyn PushOperator>> = Vec::new();
78 let source = decompose_recursive(root, &mut push_ops);
79 push_ops.reverse();
81 (source, push_ops)
82}
83
84#[cfg(feature = "spill")]
91pub fn convert_to_pipeline_with_memory(
92 root: Box<dyn Operator>,
93 memory_ctx: Option<super::memory::OperatorMemoryContext>,
94) -> (Box<dyn Operator>, Vec<Box<dyn PushOperator>>) {
95 let Some(ctx) = memory_ctx else {
96 return convert_to_pipeline(root);
97 };
98 let mut push_ops: Vec<Box<dyn PushOperator>> = Vec::new();
99 let source = decompose_recursive_memory(root, &mut push_ops, &ctx);
100 push_ops.reverse();
101 (source, push_ops)
102}
103
104#[cfg(feature = "spill")]
106fn decompose_recursive_memory(
107 op: Box<dyn Operator>,
108 push_ops: &mut Vec<Box<dyn PushOperator>>,
109 ctx: &super::memory::OperatorMemoryContext,
110) -> Box<dyn Operator> {
111 use super::operators::{SpillableAggregatePushOperator, SpillableSortPushOperator};
112
113 match op.name() {
114 "Filter" => {
115 let any = op.into_any();
116 let filter = any
117 .downcast::<FilterOperator>()
118 .expect("name() returned 'Filter' but downcast failed");
119 let (child, predicate) = filter.into_parts();
120 push_ops.push(Box::new(FilterPushOperator::new(Box::new(
121 PredicateAdapter(predicate),
122 ))));
123 decompose_recursive_memory(child, push_ops, ctx)
124 }
125 "Sort" => {
126 let any = op.into_any();
127 let sort = any
128 .downcast::<SortOperator>()
129 .expect("name() returned 'Sort' but downcast failed");
130 let (child, sort_keys) = sort.into_parts();
131 let push_keys: Vec<_> = sort_keys.iter().map(convert_sort_key).collect();
132 push_ops.push(Box::new(SpillableSortPushOperator::with_memory_context(
133 push_keys,
134 ctx.clone(),
135 )));
136 decompose_recursive_memory(child, push_ops, ctx)
137 }
138 "HashAggregate" => {
139 let any = op.into_any();
140 let agg = any
141 .downcast::<HashAggregateOperator>()
142 .expect("name() returned 'HashAggregate' but downcast failed");
143 let (child, group_columns, aggregates) = agg.into_parts();
144 push_ops.push(Box::new(
145 SpillableAggregatePushOperator::with_memory_context(
146 group_columns,
147 aggregates,
148 ctx.clone(),
149 ),
150 ));
151 decompose_recursive_memory(child, push_ops, ctx)
152 }
153 "Limit" => {
154 let any = op.into_any();
155 let limit = any
156 .downcast::<LimitOperator>()
157 .expect("name() returned 'Limit' but downcast failed");
158 let (child, count) = limit.into_parts();
159 push_ops.push(Box::new(LimitPushOperator::new(count)));
160 decompose_recursive_memory(child, push_ops, ctx)
161 }
162 "Distinct" => {
163 let any = op.into_any();
164 let distinct = any
165 .downcast::<DistinctOperator>()
166 .expect("name() returned 'Distinct' but downcast failed");
167 let (child, columns) = distinct.into_parts();
168 let push_distinct = if let Some(cols) = columns {
169 DistinctPushOperator::on_columns(cols)
170 } else {
171 DistinctPushOperator::new()
172 };
173 push_ops.push(Box::new(push_distinct));
174 decompose_recursive_memory(child, push_ops, ctx)
175 }
176 _ => op,
177 }
178}
179
180fn decompose_recursive(
185 op: Box<dyn Operator>,
186 push_ops: &mut Vec<Box<dyn PushOperator>>,
187) -> Box<dyn Operator> {
188 match op.name() {
189 "Filter" => {
190 let any = op.into_any();
191 let filter = any
192 .downcast::<FilterOperator>()
193 .expect("name() returned 'Filter' but downcast failed");
194 let (child, predicate) = filter.into_parts();
195 push_ops.push(Box::new(FilterPushOperator::new(Box::new(
196 PredicateAdapter(predicate),
197 ))));
198 decompose_recursive(child, push_ops)
199 }
200 "Sort" => {
207 let any = op.into_any();
208 let sort = any
209 .downcast::<SortOperator>()
210 .expect("name() returned 'Sort' but downcast failed");
211 let (child, sort_keys) = sort.into_parts();
212 let push_keys: Vec<_> = sort_keys.iter().map(convert_sort_key).collect();
213 push_ops.push(Box::new(SortPushOperator::new(push_keys)));
214 decompose_recursive(child, push_ops)
215 }
216 "HashAggregate" => {
217 let any = op.into_any();
218 let agg = any
219 .downcast::<HashAggregateOperator>()
220 .expect("name() returned 'HashAggregate' but downcast failed");
221 let (child, group_columns, aggregates) = agg.into_parts();
222 push_ops.push(Box::new(AggregatePushOperator::new(
223 group_columns,
224 aggregates,
225 )));
226 decompose_recursive(child, push_ops)
227 }
228 "Limit" => {
229 let any = op.into_any();
230 let limit = any
231 .downcast::<LimitOperator>()
232 .expect("name() returned 'Limit' but downcast failed");
233 let (child, count) = limit.into_parts();
234 push_ops.push(Box::new(LimitPushOperator::new(count)));
235 decompose_recursive(child, push_ops)
236 }
237 "Distinct" => {
238 let any = op.into_any();
239 let distinct = any
240 .downcast::<DistinctOperator>()
241 .expect("name() returned 'Distinct' but downcast failed");
242 let (child, columns) = distinct.into_parts();
243 let push_distinct = if let Some(cols) = columns {
244 DistinctPushOperator::on_columns(cols)
245 } else {
246 DistinctPushOperator::new()
247 };
248 push_ops.push(Box::new(push_distinct));
249 decompose_recursive(child, push_ops)
250 }
251 _ => op,
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::execution::operators::{OperatorResult, SortKey};
260 use grafeo_common::types::LogicalType;
261
262 struct AlwaysTruePredicate;
264
265 impl Predicate for AlwaysTruePredicate {
266 fn evaluate(&self, _chunk: &DataChunk, _row: usize) -> bool {
267 true
268 }
269 }
270
271 struct TestScanOperator {
273 emitted: bool,
274 }
275
276 impl TestScanOperator {
277 fn new() -> Self {
278 Self { emitted: false }
279 }
280 }
281
282 impl Operator for TestScanOperator {
283 fn next(&mut self) -> OperatorResult {
284 if self.emitted {
285 return Ok(None);
286 }
287 self.emitted = true;
288 let mut col = crate::execution::vector::ValueVector::with_type(LogicalType::Int64);
289 col.push_int64(1);
290 col.push_int64(2);
291 col.push_int64(3);
292 Ok(Some(DataChunk::new(vec![col])))
293 }
294
295 fn reset(&mut self) {
296 self.emitted = false;
297 }
298
299 fn name(&self) -> &'static str {
300 "TestScan"
301 }
302
303 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
304 self
305 }
306 }
307
308 #[test]
309 fn convert_bare_scan_produces_empty_pipeline() {
310 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
311 let (source, push_ops) = convert_to_pipeline(scan);
312 assert!(push_ops.is_empty());
313 assert_eq!(source.name(), "TestScan");
314 }
315
316 #[test]
317 fn convert_filter_scan_produces_one_push_op() {
318 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
319 let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
320 let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
321
322 let (source, push_ops) = convert_to_pipeline(filter);
323 assert_eq!(source.name(), "TestScan");
324 assert_eq!(push_ops.len(), 1);
325 assert_eq!(push_ops.len(), 1);
326 assert!(
328 push_ops[0].name().contains("Filter"),
329 "expected filter push op, got {}",
330 push_ops[0].name()
331 );
332 }
333
334 #[test]
335 fn convert_limit_filter_scan_produces_two_push_ops() {
336 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
337 let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
338 let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
339 let limit: Box<dyn Operator> =
340 Box::new(LimitOperator::new(filter, 10, vec![LogicalType::Int64]));
341
342 let (source, push_ops) = convert_to_pipeline(limit);
343 assert_eq!(source.name(), "TestScan");
344 assert_eq!(push_ops.len(), 2);
345 assert!(push_ops[0].name().contains("Filter"));
347 assert!(push_ops[1].name().contains("Limit"));
348 }
349
350 #[test]
351 fn convert_sort_scan_produces_one_push_op() {
352 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
353 let keys = vec![SortKey::ascending(0)];
354 let sort: Box<dyn Operator> =
355 Box::new(SortOperator::new(scan, keys, vec![LogicalType::Int64]));
356
357 let (source, push_ops) = convert_to_pipeline(sort);
358 assert_eq!(source.name(), "TestScan");
359 assert_eq!(push_ops.len(), 1);
360 assert!(push_ops[0].name().contains("Sort"));
361 }
362
363 #[test]
364 fn convert_aggregate_scan_produces_one_push_op() {
365 use crate::execution::operators::{AggregateExpr, AggregateFunction};
366
367 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
368 let aggregates = vec![AggregateExpr {
369 function: AggregateFunction::Count,
370 column: None,
371 column2: None,
372 distinct: false,
373 alias: None,
374 percentile: None,
375 separator: None,
376 }];
377 let agg: Box<dyn Operator> = Box::new(HashAggregateOperator::new(
378 scan,
379 vec![],
380 aggregates,
381 vec![LogicalType::Int64],
382 ));
383
384 let (source, push_ops) = convert_to_pipeline(agg);
385 assert_eq!(source.name(), "TestScan");
386 assert_eq!(push_ops.len(), 1);
387 assert!(push_ops[0].name().contains("Aggregate"));
388 }
389
390 #[test]
391 fn convert_distinct_scan_produces_one_push_op() {
392 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
393 let distinct: Box<dyn Operator> =
394 Box::new(DistinctOperator::new(scan, vec![LogicalType::Int64]));
395
396 let (source, push_ops) = convert_to_pipeline(distinct);
397 assert_eq!(source.name(), "TestScan");
398 assert_eq!(push_ops.len(), 1);
399 assert!(push_ops[0].name().contains("Distinct"));
400 }
401
402 #[test]
403 fn convert_distinct_on_columns_scan() {
404 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
405 let distinct: Box<dyn Operator> = Box::new(DistinctOperator::on_columns(
406 scan,
407 vec![0],
408 vec![LogicalType::Int64],
409 ));
410
411 let (source, push_ops) = convert_to_pipeline(distinct);
412 assert_eq!(source.name(), "TestScan");
413 assert_eq!(push_ops.len(), 1);
414 assert!(push_ops[0].name().contains("Distinct"));
415 }
416
417 #[test]
418 fn convert_deep_pipeline_sort_filter_limit() {
419 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
420 let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
421 let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
422 let keys = vec![SortKey::ascending(0)];
423 let sort: Box<dyn Operator> =
424 Box::new(SortOperator::new(filter, keys, vec![LogicalType::Int64]));
425 let limit: Box<dyn Operator> =
426 Box::new(LimitOperator::new(sort, 5, vec![LogicalType::Int64]));
427
428 let (source, push_ops) = convert_to_pipeline(limit);
429 assert_eq!(source.name(), "TestScan");
430 assert_eq!(push_ops.len(), 3);
431 assert!(push_ops[0].name().contains("Filter"));
433 assert!(push_ops[1].name().contains("Sort"));
434 assert!(push_ops[2].name().contains("Limit"));
435 }
436
437 #[test]
438 fn pipeline_roundtrip_produces_correct_results() {
439 use crate::execution::pipeline::Pipeline;
440 use crate::execution::sink::CollectorSink;
441 use crate::execution::source::OperatorSource;
442
443 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
445 let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
446 let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
447 let keys = vec![SortKey::ascending(0)];
448 let sort: Box<dyn Operator> =
449 Box::new(SortOperator::new(filter, keys, vec![LogicalType::Int64]));
450
451 let (source, push_ops) = convert_to_pipeline(sort);
453 assert_eq!(push_ops.len(), 2); let source = Box::new(OperatorSource::new(source));
457 let collector = CollectorSink::new();
458 let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
459 pipeline.execute().unwrap();
460
461 let sink_box = pipeline.into_sink();
463 let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
464 let collector = any_sink.downcast::<CollectorSink>().unwrap();
465 assert_eq!(collector.row_count(), 3);
466 }
467
468 #[test]
469 fn predicate_adapter_delegates_correctly() {
470 let mut col = crate::execution::vector::ValueVector::with_type(LogicalType::Int64);
471 col.push_int64(42);
472 let chunk = DataChunk::new(vec![col]);
473
474 let adapter = PredicateAdapter(Box::new(AlwaysTruePredicate));
475 assert!(adapter.evaluate(&chunk, 0));
476 }
477
478 #[test]
479 fn convert_sort_key_maps_directions() {
480 use crate::execution::operators::{NullOrder, SortDirection};
481
482 use crate::execution::operators::push::{
483 NullOrder as PushNullOrder, SortDirection as PushSortDirection,
484 };
485
486 let asc = super::convert_sort_key(&SortKey {
487 column: 3,
488 direction: SortDirection::Ascending,
489 null_order: NullOrder::NullsFirst,
490 });
491 assert_eq!(asc.column, 3);
492 assert_eq!(asc.direction, PushSortDirection::Ascending);
493 assert_eq!(asc.null_order, PushNullOrder::First);
494
495 let desc = super::convert_sort_key(&SortKey {
496 column: 7,
497 direction: SortDirection::Descending,
498 null_order: NullOrder::NullsLast,
499 });
500 assert_eq!(desc.column, 7);
501 assert_eq!(desc.direction, PushSortDirection::Descending);
502 assert_eq!(desc.null_order, PushNullOrder::Last);
503 }
504
505 #[test]
506 fn test_distinct_on_columns_pipeline_execution() {
507 use crate::execution::pipeline::Pipeline;
508 use crate::execution::sink::CollectorSink;
509
510 let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
512 let distinct: Box<dyn Operator> = Box::new(DistinctOperator::on_columns(
513 scan,
514 vec![0],
515 vec![LogicalType::Int64],
516 ));
517
518 let (source, push_ops) = convert_to_pipeline(distinct);
519 assert_eq!(push_ops.len(), 1);
520 assert!(push_ops[0].name().contains("Distinct"));
521
522 let source = Box::new(crate::execution::source::OperatorSource::new(source));
524 let collector = CollectorSink::new();
525 let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
526 pipeline.execute().unwrap();
527
528 let sink_box = pipeline.into_sink();
529 let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
530 let collector = any_sink.downcast::<CollectorSink>().unwrap();
531 assert_eq!(collector.row_count(), 3);
533 }
534
535 #[test]
536 fn test_unrecognized_operator_stays_as_source() {
537 struct CustomJoinOperator;
539
540 impl Operator for CustomJoinOperator {
541 fn next(&mut self) -> OperatorResult {
542 Ok(None)
543 }
544
545 fn reset(&mut self) {}
546
547 fn name(&self) -> &'static str {
548 "CustomNestedLoopJoin"
549 }
550
551 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
552 self
553 }
554 }
555
556 let join: Box<dyn Operator> = Box::new(CustomJoinOperator);
557 let (source, push_ops) = convert_to_pipeline(join);
558 assert_eq!(source.name(), "CustomNestedLoopJoin");
559 assert!(
560 push_ops.is_empty(),
561 "unrecognized operator should produce no push ops"
562 );
563 }
564}