Skip to main content

grafeo_core/execution/
pipeline_convert.rs

1//! Converts pull-based operator trees into push-based pipelines.
2//!
3//! The converter walks the operator tree top-down, decomposing operators that
4//! have push equivalents. Source operators (scan, expand, join) stay pull-based
5//! and get wrapped in [`OperatorSource`](super::source::OperatorSource).
6//!
7//! This enables the documented push-based execution model without modifying
8//! the planner, which continues to emit pull-based operator trees.
9
10use super::chunk::DataChunk;
11use super::operators::push::FilterPredicate;
12use super::operators::{
13    AggregatePushOperator, DistinctPushOperator, FilterPushOperator, LimitPushOperator,
14    SortPushOperator,
15};
16use super::operators::{
17    DistinctOperator, FilterOperator, HashAggregateOperator, LimitOperator, Operator, Predicate,
18    SortOperator,
19};
20use super::pipeline::PushOperator;
21
22// -------------------------------------------------------------------------
23// Type adapters (bridge pull types to push types)
24// -------------------------------------------------------------------------
25
26/// Adapts a pull-based [`Predicate`] to the push [`FilterPredicate`] trait.
27pub struct PredicateAdapter(pub Box<dyn Predicate>);
28
29impl FilterPredicate for PredicateAdapter {
30    fn evaluate(&self, chunk: &DataChunk, row: usize) -> bool {
31        self.0.evaluate(chunk, row)
32    }
33}
34
35// NOTE: ProjectExprAdapter and is_simple_project are intentionally omitted.
36// ProjectOperator carries store references, transaction context, and session
37// context that cannot be transferred to push operators. Project stays pull-based.
38// When a dedicated PushProjectOperator with store access is added, revisit this.
39
40/// Converts a pull-based sort key to the push-based equivalent.
41///
42/// Both types have identical fields but are separate types in separate modules.
43fn convert_sort_key(pull: &super::operators::SortKey) -> super::operators::push::SortKey {
44    use super::operators::{NullOrder, SortDirection};
45    super::operators::push::SortKey {
46        column: pull.column,
47        direction: match pull.direction {
48            SortDirection::Ascending => super::operators::push::SortDirection::Ascending,
49            SortDirection::Descending => super::operators::push::SortDirection::Descending,
50        },
51        null_order: match pull.null_order {
52            NullOrder::NullsFirst => super::operators::push::NullOrder::First,
53            NullOrder::NullsLast => super::operators::push::NullOrder::Last,
54        },
55    }
56}
57
58// -------------------------------------------------------------------------
59// Pipeline converter
60// -------------------------------------------------------------------------
61
62/// Converts a pull-based operator tree into a source operator and a chain of push operators.
63///
64/// Walks the tree from the root, decomposing operators that have push equivalents
65/// (Filter, Sort, Aggregate, Limit, Distinct). Stops at source operators
66/// (scan, expand, join, etc.) which stay pull-based.
67///
68/// Returns `(source, push_ops)` where:
69/// - `source` is the deepest non-convertible operator (pull-based)
70/// - `push_ops` is the chain of push operators in pipeline order (source-first)
71///
72/// If the root operator has no push equivalent (e.g., a bare scan), returns
73/// an empty `push_ops` vec.
74pub fn convert_to_pipeline(
75    root: Box<dyn Operator>,
76) -> (Box<dyn Operator>, Vec<Box<dyn PushOperator>>) {
77    let mut push_ops: Vec<Box<dyn PushOperator>> = Vec::new();
78    let source = decompose_recursive(root, &mut push_ops);
79    // Push ops are collected root-first (outermost first), reverse for pipeline order
80    push_ops.reverse();
81    (source, push_ops)
82}
83
84/// Converts a pull-based operator tree into a push pipeline with memory-aware spilling.
85///
86/// When `memory_ctx` is `Some`, Sort and Aggregate operators are created as their
87/// spillable variants that register with the `BufferManager` and spill based on
88/// system memory pressure. When `memory_ctx` is `None`, delegates to
89/// [`convert_to_pipeline`] (non-spillable operators).
90#[cfg(feature = "spill")]
91pub fn convert_to_pipeline_with_memory(
92    root: Box<dyn Operator>,
93    memory_ctx: Option<super::memory::OperatorMemoryContext>,
94) -> (Box<dyn Operator>, Vec<Box<dyn PushOperator>>) {
95    let Some(ctx) = memory_ctx else {
96        return convert_to_pipeline(root);
97    };
98    let mut push_ops: Vec<Box<dyn PushOperator>> = Vec::new();
99    let source = decompose_recursive_memory(root, &mut push_ops, &ctx);
100    push_ops.reverse();
101    (source, push_ops)
102}
103
104/// Recursively decomposes operators with memory-aware spillable variants.
105#[cfg(feature = "spill")]
106fn decompose_recursive_memory(
107    op: Box<dyn Operator>,
108    push_ops: &mut Vec<Box<dyn PushOperator>>,
109    ctx: &super::memory::OperatorMemoryContext,
110) -> Box<dyn Operator> {
111    use super::operators::{SpillableAggregatePushOperator, SpillableSortPushOperator};
112
113    match op.name() {
114        "Filter" => {
115            let any = op.into_any();
116            let filter = any
117                .downcast::<FilterOperator>()
118                .expect("name() returned 'Filter' but downcast failed");
119            let (child, predicate) = filter.into_parts();
120            push_ops.push(Box::new(FilterPushOperator::new(Box::new(
121                PredicateAdapter(predicate),
122            ))));
123            decompose_recursive_memory(child, push_ops, ctx)
124        }
125        "Sort" => {
126            let any = op.into_any();
127            let sort = any
128                .downcast::<SortOperator>()
129                .expect("name() returned 'Sort' but downcast failed");
130            let (child, sort_keys) = sort.into_parts();
131            let push_keys: Vec<_> = sort_keys.iter().map(convert_sort_key).collect();
132            push_ops.push(Box::new(SpillableSortPushOperator::with_memory_context(
133                push_keys,
134                ctx.clone(),
135            )));
136            decompose_recursive_memory(child, push_ops, ctx)
137        }
138        "HashAggregate" => {
139            let any = op.into_any();
140            let agg = any
141                .downcast::<HashAggregateOperator>()
142                .expect("name() returned 'HashAggregate' but downcast failed");
143            let (child, group_columns, aggregates) = agg.into_parts();
144            push_ops.push(Box::new(
145                SpillableAggregatePushOperator::with_memory_context(
146                    group_columns,
147                    aggregates,
148                    ctx.clone(),
149                ),
150            ));
151            decompose_recursive_memory(child, push_ops, ctx)
152        }
153        "Limit" => {
154            let any = op.into_any();
155            let limit = any
156                .downcast::<LimitOperator>()
157                .expect("name() returned 'Limit' but downcast failed");
158            let (child, count) = limit.into_parts();
159            push_ops.push(Box::new(LimitPushOperator::new(count)));
160            decompose_recursive_memory(child, push_ops, ctx)
161        }
162        "Distinct" => {
163            let any = op.into_any();
164            let distinct = any
165                .downcast::<DistinctOperator>()
166                .expect("name() returned 'Distinct' but downcast failed");
167            let (child, columns) = distinct.into_parts();
168            let push_distinct = if let Some(cols) = columns {
169                DistinctPushOperator::on_columns(cols)
170            } else {
171                DistinctPushOperator::new()
172            };
173            push_ops.push(Box::new(push_distinct));
174            decompose_recursive_memory(child, push_ops, ctx)
175        }
176        _ => op,
177    }
178}
179
180/// Recursively decomposes operators, collecting push equivalents.
181///
182/// Uses `name()` to identify the operator type, then `into_any()` + downcast
183/// to decompose it into child + push operator.
184fn decompose_recursive(
185    op: Box<dyn Operator>,
186    push_ops: &mut Vec<Box<dyn PushOperator>>,
187) -> Box<dyn Operator> {
188    match op.name() {
189        "Filter" => {
190            let any = op.into_any();
191            let filter = any
192                .downcast::<FilterOperator>()
193                .expect("name() returned 'Filter' but downcast failed");
194            let (child, predicate) = filter.into_parts();
195            push_ops.push(Box::new(FilterPushOperator::new(Box::new(
196                PredicateAdapter(predicate),
197            ))));
198            decompose_recursive(child, push_ops)
199        }
200        // Project is NOT decomposed because it often holds store references,
201        // transaction context, and session context that cannot be transferred
202        // to push operators. Treat as a source operator boundary.
203        //
204        // TODO: when a dedicated PushProjectOperator with store access exists,
205        // revisit this decision.
206        "Sort" => {
207            let any = op.into_any();
208            let sort = any
209                .downcast::<SortOperator>()
210                .expect("name() returned 'Sort' but downcast failed");
211            let (child, sort_keys) = sort.into_parts();
212            let push_keys: Vec<_> = sort_keys.iter().map(convert_sort_key).collect();
213            push_ops.push(Box::new(SortPushOperator::new(push_keys)));
214            decompose_recursive(child, push_ops)
215        }
216        "HashAggregate" => {
217            let any = op.into_any();
218            let agg = any
219                .downcast::<HashAggregateOperator>()
220                .expect("name() returned 'HashAggregate' but downcast failed");
221            let (child, group_columns, aggregates) = agg.into_parts();
222            push_ops.push(Box::new(AggregatePushOperator::new(
223                group_columns,
224                aggregates,
225            )));
226            decompose_recursive(child, push_ops)
227        }
228        "Limit" => {
229            let any = op.into_any();
230            let limit = any
231                .downcast::<LimitOperator>()
232                .expect("name() returned 'Limit' but downcast failed");
233            let (child, count) = limit.into_parts();
234            push_ops.push(Box::new(LimitPushOperator::new(count)));
235            decompose_recursive(child, push_ops)
236        }
237        "Distinct" => {
238            let any = op.into_any();
239            let distinct = any
240                .downcast::<DistinctOperator>()
241                .expect("name() returned 'Distinct' but downcast failed");
242            let (child, columns) = distinct.into_parts();
243            let push_distinct = if let Some(cols) = columns {
244                DistinctPushOperator::on_columns(cols)
245            } else {
246                DistinctPushOperator::new()
247            };
248            push_ops.push(Box::new(push_distinct));
249            decompose_recursive(child, push_ops)
250        }
251        // Not convertible: this is a source operator (scan, expand, join, etc.)
252        _ => op,
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::execution::operators::{OperatorResult, SortKey};
260    use grafeo_common::types::LogicalType;
261
262    /// A trivial predicate that always returns true (for testing decomposition only).
263    struct AlwaysTruePredicate;
264
265    impl Predicate for AlwaysTruePredicate {
266        fn evaluate(&self, _chunk: &DataChunk, _row: usize) -> bool {
267            true
268        }
269    }
270
271    /// A minimal test operator that produces one chunk.
272    struct TestScanOperator {
273        emitted: bool,
274    }
275
276    impl TestScanOperator {
277        fn new() -> Self {
278            Self { emitted: false }
279        }
280    }
281
282    impl Operator for TestScanOperator {
283        fn next(&mut self) -> OperatorResult {
284            if self.emitted {
285                return Ok(None);
286            }
287            self.emitted = true;
288            let mut col = crate::execution::vector::ValueVector::with_type(LogicalType::Int64);
289            col.push_int64(1);
290            col.push_int64(2);
291            col.push_int64(3);
292            Ok(Some(DataChunk::new(vec![col])))
293        }
294
295        fn reset(&mut self) {
296            self.emitted = false;
297        }
298
299        fn name(&self) -> &'static str {
300            "TestScan"
301        }
302
303        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
304            self
305        }
306    }
307
308    #[test]
309    fn convert_bare_scan_produces_empty_pipeline() {
310        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
311        let (source, push_ops) = convert_to_pipeline(scan);
312        assert!(push_ops.is_empty());
313        assert_eq!(source.name(), "TestScan");
314    }
315
316    #[test]
317    fn convert_filter_scan_produces_one_push_op() {
318        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
319        let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
320        let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
321
322        let (source, push_ops) = convert_to_pipeline(filter);
323        assert_eq!(source.name(), "TestScan");
324        assert_eq!(push_ops.len(), 1);
325        assert_eq!(push_ops.len(), 1);
326        // Push operators have their own naming convention
327        assert!(
328            push_ops[0].name().contains("Filter"),
329            "expected filter push op, got {}",
330            push_ops[0].name()
331        );
332    }
333
334    #[test]
335    fn convert_limit_filter_scan_produces_two_push_ops() {
336        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
337        let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
338        let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
339        let limit: Box<dyn Operator> =
340            Box::new(LimitOperator::new(filter, 10, vec![LogicalType::Int64]));
341
342        let (source, push_ops) = convert_to_pipeline(limit);
343        assert_eq!(source.name(), "TestScan");
344        assert_eq!(push_ops.len(), 2);
345        // Pipeline order: filter first, then limit
346        assert!(push_ops[0].name().contains("Filter"));
347        assert!(push_ops[1].name().contains("Limit"));
348    }
349
350    #[test]
351    fn convert_sort_scan_produces_one_push_op() {
352        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
353        let keys = vec![SortKey::ascending(0)];
354        let sort: Box<dyn Operator> =
355            Box::new(SortOperator::new(scan, keys, vec![LogicalType::Int64]));
356
357        let (source, push_ops) = convert_to_pipeline(sort);
358        assert_eq!(source.name(), "TestScan");
359        assert_eq!(push_ops.len(), 1);
360        assert!(push_ops[0].name().contains("Sort"));
361    }
362
363    #[test]
364    fn convert_aggregate_scan_produces_one_push_op() {
365        use crate::execution::operators::{AggregateExpr, AggregateFunction};
366
367        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
368        let aggregates = vec![AggregateExpr {
369            function: AggregateFunction::Count,
370            column: None,
371            column2: None,
372            distinct: false,
373            alias: None,
374            percentile: None,
375            separator: None,
376        }];
377        let agg: Box<dyn Operator> = Box::new(HashAggregateOperator::new(
378            scan,
379            vec![],
380            aggregates,
381            vec![LogicalType::Int64],
382        ));
383
384        let (source, push_ops) = convert_to_pipeline(agg);
385        assert_eq!(source.name(), "TestScan");
386        assert_eq!(push_ops.len(), 1);
387        assert!(push_ops[0].name().contains("Aggregate"));
388    }
389
390    #[test]
391    fn convert_distinct_scan_produces_one_push_op() {
392        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
393        let distinct: Box<dyn Operator> =
394            Box::new(DistinctOperator::new(scan, vec![LogicalType::Int64]));
395
396        let (source, push_ops) = convert_to_pipeline(distinct);
397        assert_eq!(source.name(), "TestScan");
398        assert_eq!(push_ops.len(), 1);
399        assert!(push_ops[0].name().contains("Distinct"));
400    }
401
402    #[test]
403    fn convert_distinct_on_columns_scan() {
404        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
405        let distinct: Box<dyn Operator> = Box::new(DistinctOperator::on_columns(
406            scan,
407            vec![0],
408            vec![LogicalType::Int64],
409        ));
410
411        let (source, push_ops) = convert_to_pipeline(distinct);
412        assert_eq!(source.name(), "TestScan");
413        assert_eq!(push_ops.len(), 1);
414        assert!(push_ops[0].name().contains("Distinct"));
415    }
416
417    #[test]
418    fn convert_deep_pipeline_sort_filter_limit() {
419        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
420        let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
421        let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
422        let keys = vec![SortKey::ascending(0)];
423        let sort: Box<dyn Operator> =
424            Box::new(SortOperator::new(filter, keys, vec![LogicalType::Int64]));
425        let limit: Box<dyn Operator> =
426            Box::new(LimitOperator::new(sort, 5, vec![LogicalType::Int64]));
427
428        let (source, push_ops) = convert_to_pipeline(limit);
429        assert_eq!(source.name(), "TestScan");
430        assert_eq!(push_ops.len(), 3);
431        // Pipeline order: filter, sort, limit (source-first)
432        assert!(push_ops[0].name().contains("Filter"));
433        assert!(push_ops[1].name().contains("Sort"));
434        assert!(push_ops[2].name().contains("Limit"));
435    }
436
437    #[test]
438    fn pipeline_roundtrip_produces_correct_results() {
439        use crate::execution::pipeline::Pipeline;
440        use crate::execution::sink::CollectorSink;
441        use crate::execution::source::OperatorSource;
442
443        // Build: Scan -> Filter(always true) -> Sort(col 0 ASC)
444        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
445        let predicate: Box<dyn Predicate> = Box::new(AlwaysTruePredicate);
446        let filter: Box<dyn Operator> = Box::new(FilterOperator::new(scan, predicate));
447        let keys = vec![SortKey::ascending(0)];
448        let sort: Box<dyn Operator> =
449            Box::new(SortOperator::new(filter, keys, vec![LogicalType::Int64]));
450
451        // Convert to pipeline
452        let (source, push_ops) = convert_to_pipeline(sort);
453        assert_eq!(push_ops.len(), 2); // Filter + Sort
454
455        // Execute the pipeline
456        let source = Box::new(OperatorSource::new(source));
457        let collector = CollectorSink::new();
458        let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
459        pipeline.execute().unwrap();
460
461        // Extract results
462        let sink_box = pipeline.into_sink();
463        let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
464        let collector = any_sink.downcast::<CollectorSink>().unwrap();
465        assert_eq!(collector.row_count(), 3);
466    }
467
468    #[test]
469    fn predicate_adapter_delegates_correctly() {
470        let mut col = crate::execution::vector::ValueVector::with_type(LogicalType::Int64);
471        col.push_int64(42);
472        let chunk = DataChunk::new(vec![col]);
473
474        let adapter = PredicateAdapter(Box::new(AlwaysTruePredicate));
475        assert!(adapter.evaluate(&chunk, 0));
476    }
477
478    #[test]
479    fn convert_sort_key_maps_directions() {
480        use crate::execution::operators::{NullOrder, SortDirection};
481
482        use crate::execution::operators::push::{
483            NullOrder as PushNullOrder, SortDirection as PushSortDirection,
484        };
485
486        let asc = super::convert_sort_key(&SortKey {
487            column: 3,
488            direction: SortDirection::Ascending,
489            null_order: NullOrder::NullsFirst,
490        });
491        assert_eq!(asc.column, 3);
492        assert_eq!(asc.direction, PushSortDirection::Ascending);
493        assert_eq!(asc.null_order, PushNullOrder::First);
494
495        let desc = super::convert_sort_key(&SortKey {
496            column: 7,
497            direction: SortDirection::Descending,
498            null_order: NullOrder::NullsLast,
499        });
500        assert_eq!(desc.column, 7);
501        assert_eq!(desc.direction, PushSortDirection::Descending);
502        assert_eq!(desc.null_order, PushNullOrder::Last);
503    }
504
505    #[test]
506    fn test_distinct_on_columns_pipeline_execution() {
507        use crate::execution::pipeline::Pipeline;
508        use crate::execution::sink::CollectorSink;
509
510        // Build: Scan -> Distinct(on column 0)
511        let scan: Box<dyn Operator> = Box::new(TestScanOperator::new());
512        let distinct: Box<dyn Operator> = Box::new(DistinctOperator::on_columns(
513            scan,
514            vec![0],
515            vec![LogicalType::Int64],
516        ));
517
518        let (source, push_ops) = convert_to_pipeline(distinct);
519        assert_eq!(push_ops.len(), 1);
520        assert!(push_ops[0].name().contains("Distinct"));
521
522        // Execute the pipeline and verify results
523        let source = Box::new(crate::execution::source::OperatorSource::new(source));
524        let collector = CollectorSink::new();
525        let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
526        pipeline.execute().unwrap();
527
528        let sink_box = pipeline.into_sink();
529        let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
530        let collector = any_sink.downcast::<CollectorSink>().unwrap();
531        // TestScan produces [1, 2, 3], all distinct, so 3 rows
532        assert_eq!(collector.row_count(), 3);
533    }
534
535    #[test]
536    fn test_unrecognized_operator_stays_as_source() {
537        /// A custom operator with an unrecognized name.
538        struct CustomJoinOperator;
539
540        impl Operator for CustomJoinOperator {
541            fn next(&mut self) -> OperatorResult {
542                Ok(None)
543            }
544
545            fn reset(&mut self) {}
546
547            fn name(&self) -> &'static str {
548                "CustomNestedLoopJoin"
549            }
550
551            fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
552                self
553            }
554        }
555
556        let join: Box<dyn Operator> = Box::new(CustomJoinOperator);
557        let (source, push_ops) = convert_to_pipeline(join);
558        assert_eq!(source.name(), "CustomNestedLoopJoin");
559        assert!(
560            push_ops.is_empty(),
561            "unrecognized operator should produce no push ops"
562        );
563    }
564}