alopex_dataframe/physical/
plan.rs1use std::path::PathBuf;
2
3use crate::lazy::{LogicalPlan, ProjectionKind};
4use crate::ops::{FillNull, JoinKeys, JoinType, SortOptions};
5use crate::{DataFrame, Expr, Result};
6
7#[derive(Debug, Clone)]
9pub enum ScanSource {
10 DataFrame(DataFrame),
12 Csv {
14 path: PathBuf,
15 predicate: Option<Expr>,
16 projection: Option<Vec<String>>,
17 },
18 Parquet {
20 path: PathBuf,
21 predicate: Option<Expr>,
22 projection: Option<Vec<String>>,
23 },
24}
25
26#[derive(Debug, Clone)]
28pub enum PhysicalPlan {
29 ScanExec { source: ScanSource },
31 ProjectionExec {
33 input: Box<PhysicalPlan>,
34 exprs: Vec<Expr>,
35 kind: ProjectionKind,
36 },
37 FilterExec {
39 input: Box<PhysicalPlan>,
40 predicate: Expr,
41 },
42 AggregateExec {
44 input: Box<PhysicalPlan>,
45 group_by: Vec<Expr>,
46 aggs: Vec<Expr>,
47 },
48 JoinExec {
50 left: Box<PhysicalPlan>,
51 right: Box<PhysicalPlan>,
52 keys: JoinKeys,
53 how: JoinType,
54 },
55 SortExec {
57 input: Box<PhysicalPlan>,
58 options: SortOptions,
59 },
60 SliceExec {
62 input: Box<PhysicalPlan>,
63 offset: usize,
64 len: usize,
65 from_end: bool,
66 },
67 UniqueExec {
69 input: Box<PhysicalPlan>,
70 subset: Option<Vec<String>>,
71 },
72 FillNullExec {
74 input: Box<PhysicalPlan>,
75 fill: FillNull,
76 },
77 DropNullsExec {
79 input: Box<PhysicalPlan>,
80 subset: Option<Vec<String>>,
81 },
82 NullCountExec { input: Box<PhysicalPlan> },
84}
85
86pub fn compile(logical: &LogicalPlan) -> Result<PhysicalPlan> {
88 let plan = match logical {
89 LogicalPlan::DataFrameScan { df } => PhysicalPlan::ScanExec {
90 source: ScanSource::DataFrame(df.clone()),
91 },
92 LogicalPlan::CsvScan {
93 path,
94 predicate,
95 projection,
96 } => PhysicalPlan::ScanExec {
97 source: ScanSource::Csv {
98 path: path.clone(),
99 predicate: predicate.clone(),
100 projection: projection.clone(),
101 },
102 },
103 LogicalPlan::ParquetScan {
104 path,
105 predicate,
106 projection,
107 } => PhysicalPlan::ScanExec {
108 source: ScanSource::Parquet {
109 path: path.clone(),
110 predicate: predicate.clone(),
111 projection: projection.clone(),
112 },
113 },
114 LogicalPlan::Projection { input, exprs, kind } => PhysicalPlan::ProjectionExec {
115 input: Box::new(compile(input)?),
116 exprs: exprs.clone(),
117 kind: kind.clone(),
118 },
119 LogicalPlan::Filter { input, predicate } => PhysicalPlan::FilterExec {
120 input: Box::new(compile(input)?),
121 predicate: predicate.clone(),
122 },
123 LogicalPlan::Aggregate {
124 input,
125 group_by,
126 aggs,
127 } => PhysicalPlan::AggregateExec {
128 input: Box::new(compile(input)?),
129 group_by: group_by.clone(),
130 aggs: aggs.clone(),
131 },
132 LogicalPlan::Join {
133 left,
134 right,
135 keys,
136 how,
137 } => PhysicalPlan::JoinExec {
138 left: Box::new(compile(left)?),
139 right: Box::new(compile(right)?),
140 keys: keys.clone(),
141 how: *how,
142 },
143 LogicalPlan::Sort { input, options } => PhysicalPlan::SortExec {
144 input: Box::new(compile(input)?),
145 options: options.clone(),
146 },
147 LogicalPlan::Slice {
148 input,
149 offset,
150 len,
151 from_end,
152 } => PhysicalPlan::SliceExec {
153 input: Box::new(compile(input)?),
154 offset: *offset,
155 len: *len,
156 from_end: *from_end,
157 },
158 LogicalPlan::Unique { input, subset } => PhysicalPlan::UniqueExec {
159 input: Box::new(compile(input)?),
160 subset: subset.clone(),
161 },
162 LogicalPlan::FillNull { input, fill } => PhysicalPlan::FillNullExec {
163 input: Box::new(compile(input)?),
164 fill: fill.clone(),
165 },
166 LogicalPlan::DropNulls { input, subset } => PhysicalPlan::DropNullsExec {
167 input: Box::new(compile(input)?),
168 subset: subset.clone(),
169 },
170 LogicalPlan::NullCount { input } => PhysicalPlan::NullCountExec {
171 input: Box::new(compile(input)?),
172 },
173 };
174
175 Ok(plan)
176}