alopex_dataframe/physical/
executor.rs1use arrow::record_batch::RecordBatch;
2
3use crate::physical::operators;
4use crate::physical::plan::{PhysicalPlan, ScanSource};
5use crate::Result;
6
7pub struct Executor;
9
10impl Executor {
11 pub fn execute(plan: PhysicalPlan) -> Result<Vec<RecordBatch>> {
13 execute_plan(&plan)
14 }
15}
16
17fn execute_plan(plan: &PhysicalPlan) -> Result<Vec<RecordBatch>> {
18 match plan {
19 PhysicalPlan::ScanExec { source } => execute_scan(source),
20 PhysicalPlan::ProjectionExec { input, exprs, kind } => {
21 let batches = execute_plan(input)?;
22 operators::project_batches(batches, exprs, kind.clone())
23 }
24 PhysicalPlan::FilterExec { input, predicate } => {
25 let batches = execute_plan(input)?;
26 operators::filter_batches(batches, predicate)
27 }
28 PhysicalPlan::AggregateExec {
29 input,
30 group_by,
31 aggs,
32 } => {
33 let batches = execute_plan(input)?;
34 operators::aggregate_batches(batches, group_by, aggs)
35 }
36 PhysicalPlan::JoinExec {
37 left,
38 right,
39 keys,
40 how,
41 } => {
42 let left_batches = execute_plan(left)?;
43 let right_batches = execute_plan(right)?;
44 operators::join_batches(left_batches, right_batches, keys, how)
45 }
46 PhysicalPlan::SortExec { input, options } => {
47 let batches = execute_plan(input)?;
48 operators::sort_batches(batches, options)
49 }
50 PhysicalPlan::SliceExec {
51 input,
52 offset,
53 len,
54 from_end,
55 } => {
56 let batches = execute_plan(input)?;
57 operators::slice_batches(batches, *offset, *len, *from_end)
58 }
59 PhysicalPlan::UniqueExec { input, subset } => {
60 let batches = execute_plan(input)?;
61 operators::unique_batches(batches, subset.as_deref())
62 }
63 PhysicalPlan::FillNullExec { input, fill } => {
64 let batches = execute_plan(input)?;
65 operators::fill_null_batches(batches, fill)
66 }
67 PhysicalPlan::DropNullsExec { input, subset } => {
68 let batches = execute_plan(input)?;
69 operators::drop_nulls_batches(batches, subset.as_deref())
70 }
71 PhysicalPlan::NullCountExec { input } => {
72 let batches = execute_plan(input)?;
73 operators::null_count_batches(batches)
74 }
75 }
76}
77
78fn execute_scan(source: &ScanSource) -> Result<Vec<RecordBatch>> {
79 operators::scan_source(source)
80}