Skip to main content

alopex_dataframe/physical/
executor.rs

1use arrow::record_batch::RecordBatch;
2
3use crate::physical::operators;
4use crate::physical::plan::{PhysicalPlan, ScanSource};
5use crate::Result;
6
7/// Executes `PhysicalPlan` trees and returns the resulting `RecordBatch` output.
8pub struct Executor;
9
10impl Executor {
11    /// Execute a physical plan and return the resulting record batches.
12    pub fn execute(plan: PhysicalPlan) -> Result<Vec<RecordBatch>> {
13        execute_plan(&plan)
14    }
15}
16
17fn execute_plan(plan: &PhysicalPlan) -> Result<Vec<RecordBatch>> {
18    match plan {
19        PhysicalPlan::ScanExec { source } => execute_scan(source),
20        PhysicalPlan::ProjectionExec { input, exprs, kind } => {
21            let batches = execute_plan(input)?;
22            operators::project_batches(batches, exprs, kind.clone())
23        }
24        PhysicalPlan::FilterExec { input, predicate } => {
25            let batches = execute_plan(input)?;
26            operators::filter_batches(batches, predicate)
27        }
28        PhysicalPlan::AggregateExec {
29            input,
30            group_by,
31            aggs,
32        } => {
33            let batches = execute_plan(input)?;
34            operators::aggregate_batches(batches, group_by, aggs)
35        }
36        PhysicalPlan::JoinExec {
37            left,
38            right,
39            keys,
40            how,
41        } => {
42            let left_batches = execute_plan(left)?;
43            let right_batches = execute_plan(right)?;
44            operators::join_batches(left_batches, right_batches, keys, how)
45        }
46        PhysicalPlan::SortExec { input, options } => {
47            let batches = execute_plan(input)?;
48            operators::sort_batches(batches, options)
49        }
50        PhysicalPlan::SliceExec {
51            input,
52            offset,
53            len,
54            from_end,
55        } => {
56            let batches = execute_plan(input)?;
57            operators::slice_batches(batches, *offset, *len, *from_end)
58        }
59        PhysicalPlan::UniqueExec { input, subset } => {
60            let batches = execute_plan(input)?;
61            operators::unique_batches(batches, subset.as_deref())
62        }
63        PhysicalPlan::FillNullExec { input, fill } => {
64            let batches = execute_plan(input)?;
65            operators::fill_null_batches(batches, fill)
66        }
67        PhysicalPlan::DropNullsExec { input, subset } => {
68            let batches = execute_plan(input)?;
69            operators::drop_nulls_batches(batches, subset.as_deref())
70        }
71        PhysicalPlan::NullCountExec { input } => {
72            let batches = execute_plan(input)?;
73            operators::null_count_batches(batches)
74        }
75    }
76}
77
78fn execute_scan(source: &ScanSource) -> Result<Vec<RecordBatch>> {
79    operators::scan_source(source)
80}