Skip to main content

alopex_dataframe/lazy/
logical_plan.rs

1use std::path::PathBuf;
2
3use crate::ops::{FillNull, JoinKeys, JoinType, SortOptions};
4use crate::{DataFrame, Expr};
5
6/// How a projection node should be interpreted.
7#[derive(Debug, Clone)]
8pub enum ProjectionKind {
9    /// Select columns/expressions, producing a new schema.
10    Select,
11    /// Add or overwrite columns, preserving existing columns.
12    WithColumns,
13}
14
15/// Logical query plan nodes for `LazyFrame`.
16#[derive(Debug, Clone)]
17pub enum LogicalPlan {
18    /// Scan an in-memory `DataFrame`.
19    DataFrameScan { df: DataFrame },
20    /// Scan a CSV file (predicate/projection may be pushed down).
21    CsvScan {
22        path: PathBuf,
23        predicate: Option<Expr>,
24        projection: Option<Vec<String>>,
25    },
26    /// Scan a Parquet file (predicate/projection may be pushed down).
27    ParquetScan {
28        path: PathBuf,
29        predicate: Option<Expr>,
30        projection: Option<Vec<String>>,
31    },
32    /// Projection node (select or with_columns).
33    Projection {
34        input: Box<LogicalPlan>,
35        exprs: Vec<Expr>,
36        kind: ProjectionKind,
37    },
38    /// Filter node.
39    Filter {
40        input: Box<LogicalPlan>,
41        predicate: Expr,
42    },
43    /// Aggregate node (group keys and aggregations).
44    Aggregate {
45        input: Box<LogicalPlan>,
46        group_by: Vec<Expr>,
47        aggs: Vec<Expr>,
48    },
49    /// Join two inputs.
50    Join {
51        left: Box<LogicalPlan>,
52        right: Box<LogicalPlan>,
53        keys: JoinKeys,
54        how: JoinType,
55    },
56    /// Sort input rows.
57    Sort {
58        input: Box<LogicalPlan>,
59        options: SortOptions,
60    },
61    /// Slice rows (used for head/tail).
62    Slice {
63        input: Box<LogicalPlan>,
64        offset: usize,
65        len: usize,
66        from_end: bool,
67    },
68    /// Remove duplicate rows.
69    Unique {
70        input: Box<LogicalPlan>,
71        subset: Option<Vec<String>>,
72    },
73    /// Fill nulls using a scalar or strategy.
74    FillNull {
75        input: Box<LogicalPlan>,
76        fill: FillNull,
77    },
78    /// Drop rows containing nulls.
79    DropNulls {
80        input: Box<LogicalPlan>,
81        subset: Option<Vec<String>>,
82    },
83    /// Count nulls per column.
84    NullCount { input: Box<LogicalPlan> },
85}
86
87impl LogicalPlan {
88    /// Render this plan as a readable string (used by `explain()` and tests).
89    pub fn display(&self) -> String {
90        let mut out = String::new();
91        self.fmt_into(&mut out, 0);
92        out
93    }
94
95    fn fmt_into(&self, out: &mut String, indent: usize) {
96        let pad = "  ".repeat(indent);
97        match self {
98            LogicalPlan::DataFrameScan { .. } => {
99                out.push_str(&format!("{pad}scan[dataframe]\n"));
100            }
101            LogicalPlan::CsvScan {
102                path,
103                predicate,
104                projection,
105            } => {
106                out.push_str(&format!("{pad}scan[csv path='{}']", path.display()));
107                if let Some(projection) = projection {
108                    out.push_str(&format!(" projection={:?}", projection));
109                }
110                if let Some(predicate) = predicate {
111                    out.push_str(&format!(" filters=[{}]", fmt_expr(predicate)));
112                }
113                out.push('\n');
114            }
115            LogicalPlan::ParquetScan {
116                path,
117                predicate,
118                projection,
119            } => {
120                out.push_str(&format!("{pad}scan[parquet path='{}']", path.display()));
121                if let Some(projection) = projection {
122                    out.push_str(&format!(" projection={:?}", projection));
123                }
124                if let Some(predicate) = predicate {
125                    out.push_str(&format!(" filters=[{}]", fmt_expr(predicate)));
126                }
127                out.push('\n');
128            }
129            LogicalPlan::Projection { input, exprs, kind } => {
130                let label = match kind {
131                    ProjectionKind::Select => "project",
132                    ProjectionKind::WithColumns => "with_columns",
133                };
134                out.push_str(&format!(
135                    "{pad}{label} [{}]\n",
136                    exprs.iter().map(fmt_expr).collect::<Vec<_>>().join(", ")
137                ));
138                input.fmt_into(out, indent + 1);
139            }
140            LogicalPlan::Filter { input, predicate } => {
141                out.push_str(&format!("{pad}filter [{}]\n", fmt_expr(predicate)));
142                input.fmt_into(out, indent + 1);
143            }
144            LogicalPlan::Aggregate {
145                input,
146                group_by,
147                aggs,
148            } => {
149                out.push_str(&format!(
150                    "{pad}aggregate by=[{}] aggs=[{}]\n",
151                    group_by.iter().map(fmt_expr).collect::<Vec<_>>().join(", "),
152                    aggs.iter().map(fmt_expr).collect::<Vec<_>>().join(", ")
153                ));
154                input.fmt_into(out, indent + 1);
155            }
156            LogicalPlan::Join {
157                left,
158                right,
159                keys,
160                how,
161            } => {
162                out.push_str(&format!(
163                    "{pad}join how={how:?} keys={}\n",
164                    fmt_join_keys(keys)
165                ));
166                left.fmt_into(out, indent + 1);
167                right.fmt_into(out, indent + 1);
168            }
169            LogicalPlan::Sort { input, options } => {
170                out.push_str(&format!(
171                    "{pad}sort by={:?} desc={:?} nulls_last={} stable={}\n",
172                    options.by, options.descending, options.nulls_last, options.stable
173                ));
174                input.fmt_into(out, indent + 1);
175            }
176            LogicalPlan::Slice {
177                input,
178                offset,
179                len,
180                from_end,
181            } => {
182                out.push_str(&format!(
183                    "{pad}slice offset={offset} len={len} from_end={from_end}\n"
184                ));
185                input.fmt_into(out, indent + 1);
186            }
187            LogicalPlan::Unique { input, subset } => {
188                out.push_str(&format!("{pad}unique subset={subset:?}\n"));
189                input.fmt_into(out, indent + 1);
190            }
191            LogicalPlan::FillNull { input, fill } => {
192                out.push_str(&format!("{pad}fill_null {}\n", fmt_fill_null(fill)));
193                input.fmt_into(out, indent + 1);
194            }
195            LogicalPlan::DropNulls { input, subset } => {
196                out.push_str(&format!("{pad}drop_nulls subset={subset:?}\n"));
197                input.fmt_into(out, indent + 1);
198            }
199            LogicalPlan::NullCount { input } => {
200                out.push_str(&format!("{pad}null_count\n"));
201                input.fmt_into(out, indent + 1);
202            }
203        }
204    }
205}
206
207fn fmt_join_keys(keys: &JoinKeys) -> String {
208    match keys {
209        JoinKeys::On(cols) => format!("on={cols:?}"),
210        JoinKeys::LeftRight { left_on, right_on } => {
211            format!("left_on={left_on:?} right_on={right_on:?}")
212        }
213    }
214}
215
216fn fmt_fill_null(fill: &FillNull) -> String {
217    match fill {
218        FillNull::Value(value) => format!("value={value:?}"),
219        FillNull::Strategy(strategy) => format!("strategy={strategy:?}"),
220    }
221}
222
223fn fmt_expr(expr: &Expr) -> String {
224    use crate::expr::{AggFunc, Expr as E, Operator, Scalar, UnaryOperator};
225
226    match expr {
227        E::Column(name) => format!("col({name})"),
228        E::Literal(Scalar::Null) => "lit(null)".to_string(),
229        E::Literal(Scalar::Boolean(v)) => format!("lit({v})"),
230        E::Literal(Scalar::Int64(v)) => format!("lit({v})"),
231        E::Literal(Scalar::Float64(v)) => format!("lit({v})"),
232        E::Literal(Scalar::Utf8(v)) => format!("lit({v:?})"),
233        E::Wildcard => "*".to_string(),
234        E::Alias { expr, name } => format!("{} as {name}", fmt_expr(expr)),
235        E::UnaryOp {
236            op: UnaryOperator::Not,
237            expr,
238        } => format!("not({})", fmt_expr(expr)),
239        E::BinaryOp { left, op, right } => {
240            let op_s = match op {
241                Operator::Add => "+",
242                Operator::Sub => "-",
243                Operator::Mul => "*",
244                Operator::Div => "/",
245                Operator::Eq => "==",
246                Operator::Neq => "!=",
247                Operator::Gt => ">",
248                Operator::Lt => "<",
249                Operator::Ge => ">=",
250                Operator::Le => "<=",
251                Operator::And => "and",
252                Operator::Or => "or",
253            };
254            format!("({} {op_s} {})", fmt_expr(left), fmt_expr(right))
255        }
256        E::Agg { func, expr } => {
257            let f = match func {
258                AggFunc::Sum => "sum",
259                AggFunc::Mean => "mean",
260                AggFunc::Count => "count",
261                AggFunc::Min => "min",
262                AggFunc::Max => "max",
263            };
264            format!("{f}({})", fmt_expr(expr))
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::{LogicalPlan, ProjectionKind};
272    use crate::expr::{col, lit};
273
274    #[test]
275    fn display_is_readable_and_stable() {
276        let plan = LogicalPlan::Filter {
277            input: Box::new(LogicalPlan::Projection {
278                input: Box::new(LogicalPlan::CsvScan {
279                    path: "data.csv".into(),
280                    predicate: None,
281                    projection: Some(vec!["a".to_string(), "b".to_string()]),
282                }),
283                exprs: vec![col("a"), col("b").alias("bb")],
284                kind: ProjectionKind::Select,
285            }),
286            predicate: col("a").gt(lit(1_i64)),
287        };
288
289        let s = plan.display();
290        assert!(s.contains("scan[csv"));
291        assert!(s.contains("project"));
292        assert!(s.contains("filter"));
293        assert!(s.contains("col(a)"));
294    }
295}