Skip to main content

alopex_dataframe/physical/
plan.rs

1use std::path::PathBuf;
2
3use crate::lazy::{LogicalPlan, ProjectionKind};
4use crate::ops::{FillNull, JoinKeys, JoinType, SortOptions};
5use crate::{DataFrame, Expr, Result};
6
7/// Source for a physical scan operator.
8#[derive(Debug, Clone)]
9pub enum ScanSource {
10    /// In-memory scan of a `DataFrame`.
11    DataFrame(DataFrame),
12    /// CSV file scan with optional predicate/projection pushdown.
13    Csv {
14        path: PathBuf,
15        predicate: Option<Expr>,
16        projection: Option<Vec<String>>,
17    },
18    /// Parquet file scan with optional predicate/projection pushdown.
19    Parquet {
20        path: PathBuf,
21        predicate: Option<Expr>,
22        projection: Option<Vec<String>>,
23    },
24}
25
26/// Physical execution plan produced from a `LogicalPlan`.
27#[derive(Debug, Clone)]
28pub enum PhysicalPlan {
29    /// Scan operator.
30    ScanExec { source: ScanSource },
31    /// Projection operator.
32    ProjectionExec {
33        input: Box<PhysicalPlan>,
34        exprs: Vec<Expr>,
35        kind: ProjectionKind,
36    },
37    /// Filter operator.
38    FilterExec {
39        input: Box<PhysicalPlan>,
40        predicate: Expr,
41    },
42    /// Aggregate operator.
43    AggregateExec {
44        input: Box<PhysicalPlan>,
45        group_by: Vec<Expr>,
46        aggs: Vec<Expr>,
47    },
48    /// Join operator.
49    JoinExec {
50        left: Box<PhysicalPlan>,
51        right: Box<PhysicalPlan>,
52        keys: JoinKeys,
53        how: JoinType,
54    },
55    /// Sort operator.
56    SortExec {
57        input: Box<PhysicalPlan>,
58        options: SortOptions,
59    },
60    /// Slice operator (head/tail).
61    SliceExec {
62        input: Box<PhysicalPlan>,
63        offset: usize,
64        len: usize,
65        from_end: bool,
66    },
67    /// Unique operator.
68    UniqueExec {
69        input: Box<PhysicalPlan>,
70        subset: Option<Vec<String>>,
71    },
72    /// Fill-null operator.
73    FillNullExec {
74        input: Box<PhysicalPlan>,
75        fill: FillNull,
76    },
77    /// Drop-nulls operator.
78    DropNullsExec {
79        input: Box<PhysicalPlan>,
80        subset: Option<Vec<String>>,
81    },
82    /// Null-count operator.
83    NullCountExec { input: Box<PhysicalPlan> },
84}
85
86/// Compile a `LogicalPlan` into a `PhysicalPlan`.
87pub fn compile(logical: &LogicalPlan) -> Result<PhysicalPlan> {
88    let plan = match logical {
89        LogicalPlan::DataFrameScan { df } => PhysicalPlan::ScanExec {
90            source: ScanSource::DataFrame(df.clone()),
91        },
92        LogicalPlan::CsvScan {
93            path,
94            predicate,
95            projection,
96        } => PhysicalPlan::ScanExec {
97            source: ScanSource::Csv {
98                path: path.clone(),
99                predicate: predicate.clone(),
100                projection: projection.clone(),
101            },
102        },
103        LogicalPlan::ParquetScan {
104            path,
105            predicate,
106            projection,
107        } => PhysicalPlan::ScanExec {
108            source: ScanSource::Parquet {
109                path: path.clone(),
110                predicate: predicate.clone(),
111                projection: projection.clone(),
112            },
113        },
114        LogicalPlan::Projection { input, exprs, kind } => PhysicalPlan::ProjectionExec {
115            input: Box::new(compile(input)?),
116            exprs: exprs.clone(),
117            kind: kind.clone(),
118        },
119        LogicalPlan::Filter { input, predicate } => PhysicalPlan::FilterExec {
120            input: Box::new(compile(input)?),
121            predicate: predicate.clone(),
122        },
123        LogicalPlan::Aggregate {
124            input,
125            group_by,
126            aggs,
127        } => PhysicalPlan::AggregateExec {
128            input: Box::new(compile(input)?),
129            group_by: group_by.clone(),
130            aggs: aggs.clone(),
131        },
132        LogicalPlan::Join {
133            left,
134            right,
135            keys,
136            how,
137        } => PhysicalPlan::JoinExec {
138            left: Box::new(compile(left)?),
139            right: Box::new(compile(right)?),
140            keys: keys.clone(),
141            how: *how,
142        },
143        LogicalPlan::Sort { input, options } => PhysicalPlan::SortExec {
144            input: Box::new(compile(input)?),
145            options: options.clone(),
146        },
147        LogicalPlan::Slice {
148            input,
149            offset,
150            len,
151            from_end,
152        } => PhysicalPlan::SliceExec {
153            input: Box::new(compile(input)?),
154            offset: *offset,
155            len: *len,
156            from_end: *from_end,
157        },
158        LogicalPlan::Unique { input, subset } => PhysicalPlan::UniqueExec {
159            input: Box::new(compile(input)?),
160            subset: subset.clone(),
161        },
162        LogicalPlan::FillNull { input, fill } => PhysicalPlan::FillNullExec {
163            input: Box::new(compile(input)?),
164            fill: fill.clone(),
165        },
166        LogicalPlan::DropNulls { input, subset } => PhysicalPlan::DropNullsExec {
167            input: Box::new(compile(input)?),
168            subset: subset.clone(),
169        },
170        LogicalPlan::NullCount { input } => PhysicalPlan::NullCountExec {
171            input: Box::new(compile(input)?),
172        },
173    };
174
175    Ok(plan)
176}