1use std::path::PathBuf;
2
3use crate::ops::{FillNull, JoinKeys, JoinType, SortOptions};
4use crate::{DataFrame, Expr};
5
6#[derive(Debug, Clone)]
8pub enum ProjectionKind {
9 Select,
11 WithColumns,
13}
14
15#[derive(Debug, Clone)]
17pub enum LogicalPlan {
18 DataFrameScan { df: DataFrame },
20 CsvScan {
22 path: PathBuf,
23 predicate: Option<Expr>,
24 projection: Option<Vec<String>>,
25 },
26 ParquetScan {
28 path: PathBuf,
29 predicate: Option<Expr>,
30 projection: Option<Vec<String>>,
31 },
32 Projection {
34 input: Box<LogicalPlan>,
35 exprs: Vec<Expr>,
36 kind: ProjectionKind,
37 },
38 Filter {
40 input: Box<LogicalPlan>,
41 predicate: Expr,
42 },
43 Aggregate {
45 input: Box<LogicalPlan>,
46 group_by: Vec<Expr>,
47 aggs: Vec<Expr>,
48 },
49 Join {
51 left: Box<LogicalPlan>,
52 right: Box<LogicalPlan>,
53 keys: JoinKeys,
54 how: JoinType,
55 },
56 Sort {
58 input: Box<LogicalPlan>,
59 options: SortOptions,
60 },
61 Slice {
63 input: Box<LogicalPlan>,
64 offset: usize,
65 len: usize,
66 from_end: bool,
67 },
68 Unique {
70 input: Box<LogicalPlan>,
71 subset: Option<Vec<String>>,
72 },
73 FillNull {
75 input: Box<LogicalPlan>,
76 fill: FillNull,
77 },
78 DropNulls {
80 input: Box<LogicalPlan>,
81 subset: Option<Vec<String>>,
82 },
83 NullCount { input: Box<LogicalPlan> },
85}
86
87impl LogicalPlan {
88 pub fn display(&self) -> String {
90 let mut out = String::new();
91 self.fmt_into(&mut out, 0);
92 out
93 }
94
95 fn fmt_into(&self, out: &mut String, indent: usize) {
96 let pad = " ".repeat(indent);
97 match self {
98 LogicalPlan::DataFrameScan { .. } => {
99 out.push_str(&format!("{pad}scan[dataframe]\n"));
100 }
101 LogicalPlan::CsvScan {
102 path,
103 predicate,
104 projection,
105 } => {
106 out.push_str(&format!("{pad}scan[csv path='{}']", path.display()));
107 if let Some(projection) = projection {
108 out.push_str(&format!(" projection={:?}", projection));
109 }
110 if let Some(predicate) = predicate {
111 out.push_str(&format!(" filters=[{}]", fmt_expr(predicate)));
112 }
113 out.push('\n');
114 }
115 LogicalPlan::ParquetScan {
116 path,
117 predicate,
118 projection,
119 } => {
120 out.push_str(&format!("{pad}scan[parquet path='{}']", path.display()));
121 if let Some(projection) = projection {
122 out.push_str(&format!(" projection={:?}", projection));
123 }
124 if let Some(predicate) = predicate {
125 out.push_str(&format!(" filters=[{}]", fmt_expr(predicate)));
126 }
127 out.push('\n');
128 }
129 LogicalPlan::Projection { input, exprs, kind } => {
130 let label = match kind {
131 ProjectionKind::Select => "project",
132 ProjectionKind::WithColumns => "with_columns",
133 };
134 out.push_str(&format!(
135 "{pad}{label} [{}]\n",
136 exprs.iter().map(fmt_expr).collect::<Vec<_>>().join(", ")
137 ));
138 input.fmt_into(out, indent + 1);
139 }
140 LogicalPlan::Filter { input, predicate } => {
141 out.push_str(&format!("{pad}filter [{}]\n", fmt_expr(predicate)));
142 input.fmt_into(out, indent + 1);
143 }
144 LogicalPlan::Aggregate {
145 input,
146 group_by,
147 aggs,
148 } => {
149 out.push_str(&format!(
150 "{pad}aggregate by=[{}] aggs=[{}]\n",
151 group_by.iter().map(fmt_expr).collect::<Vec<_>>().join(", "),
152 aggs.iter().map(fmt_expr).collect::<Vec<_>>().join(", ")
153 ));
154 input.fmt_into(out, indent + 1);
155 }
156 LogicalPlan::Join {
157 left,
158 right,
159 keys,
160 how,
161 } => {
162 out.push_str(&format!(
163 "{pad}join how={how:?} keys={}\n",
164 fmt_join_keys(keys)
165 ));
166 left.fmt_into(out, indent + 1);
167 right.fmt_into(out, indent + 1);
168 }
169 LogicalPlan::Sort { input, options } => {
170 out.push_str(&format!(
171 "{pad}sort by={:?} desc={:?} nulls_last={} stable={}\n",
172 options.by, options.descending, options.nulls_last, options.stable
173 ));
174 input.fmt_into(out, indent + 1);
175 }
176 LogicalPlan::Slice {
177 input,
178 offset,
179 len,
180 from_end,
181 } => {
182 out.push_str(&format!(
183 "{pad}slice offset={offset} len={len} from_end={from_end}\n"
184 ));
185 input.fmt_into(out, indent + 1);
186 }
187 LogicalPlan::Unique { input, subset } => {
188 out.push_str(&format!("{pad}unique subset={subset:?}\n"));
189 input.fmt_into(out, indent + 1);
190 }
191 LogicalPlan::FillNull { input, fill } => {
192 out.push_str(&format!("{pad}fill_null {}\n", fmt_fill_null(fill)));
193 input.fmt_into(out, indent + 1);
194 }
195 LogicalPlan::DropNulls { input, subset } => {
196 out.push_str(&format!("{pad}drop_nulls subset={subset:?}\n"));
197 input.fmt_into(out, indent + 1);
198 }
199 LogicalPlan::NullCount { input } => {
200 out.push_str(&format!("{pad}null_count\n"));
201 input.fmt_into(out, indent + 1);
202 }
203 }
204 }
205}
206
207fn fmt_join_keys(keys: &JoinKeys) -> String {
208 match keys {
209 JoinKeys::On(cols) => format!("on={cols:?}"),
210 JoinKeys::LeftRight { left_on, right_on } => {
211 format!("left_on={left_on:?} right_on={right_on:?}")
212 }
213 }
214}
215
216fn fmt_fill_null(fill: &FillNull) -> String {
217 match fill {
218 FillNull::Value(value) => format!("value={value:?}"),
219 FillNull::Strategy(strategy) => format!("strategy={strategy:?}"),
220 }
221}
222
223fn fmt_expr(expr: &Expr) -> String {
224 use crate::expr::{AggFunc, Expr as E, Operator, Scalar, UnaryOperator};
225
226 match expr {
227 E::Column(name) => format!("col({name})"),
228 E::Literal(Scalar::Null) => "lit(null)".to_string(),
229 E::Literal(Scalar::Boolean(v)) => format!("lit({v})"),
230 E::Literal(Scalar::Int64(v)) => format!("lit({v})"),
231 E::Literal(Scalar::Float64(v)) => format!("lit({v})"),
232 E::Literal(Scalar::Utf8(v)) => format!("lit({v:?})"),
233 E::Wildcard => "*".to_string(),
234 E::Alias { expr, name } => format!("{} as {name}", fmt_expr(expr)),
235 E::UnaryOp {
236 op: UnaryOperator::Not,
237 expr,
238 } => format!("not({})", fmt_expr(expr)),
239 E::BinaryOp { left, op, right } => {
240 let op_s = match op {
241 Operator::Add => "+",
242 Operator::Sub => "-",
243 Operator::Mul => "*",
244 Operator::Div => "/",
245 Operator::Eq => "==",
246 Operator::Neq => "!=",
247 Operator::Gt => ">",
248 Operator::Lt => "<",
249 Operator::Ge => ">=",
250 Operator::Le => "<=",
251 Operator::And => "and",
252 Operator::Or => "or",
253 };
254 format!("({} {op_s} {})", fmt_expr(left), fmt_expr(right))
255 }
256 E::Agg { func, expr } => {
257 let f = match func {
258 AggFunc::Sum => "sum",
259 AggFunc::Mean => "mean",
260 AggFunc::Count => "count",
261 AggFunc::Min => "min",
262 AggFunc::Max => "max",
263 };
264 format!("{f}({})", fmt_expr(expr))
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::{LogicalPlan, ProjectionKind};
272 use crate::expr::{col, lit};
273
274 #[test]
275 fn display_is_readable_and_stable() {
276 let plan = LogicalPlan::Filter {
277 input: Box::new(LogicalPlan::Projection {
278 input: Box::new(LogicalPlan::CsvScan {
279 path: "data.csv".into(),
280 predicate: None,
281 projection: Some(vec!["a".to_string(), "b".to_string()]),
282 }),
283 exprs: vec![col("a"), col("b").alias("bb")],
284 kind: ProjectionKind::Select,
285 }),
286 predicate: col("a").gt(lit(1_i64)),
287 };
288
289 let s = plan.display();
290 assert!(s.contains("scan[csv"));
291 assert!(s.contains("project"));
292 assert!(s.contains("filter"));
293 assert!(s.contains("col(a)"));
294 }
295}