Skip to main content

alopex_dataframe/lazy/
lazyframe.rs

1use std::path::Path;
2
3use crate::lazy::{LogicalPlan, Optimizer, ProjectionKind};
4use crate::ops::{FillNull, JoinKeys, JoinType, SortOptions};
5use crate::{DataFrame, Expr, Result};
6
7/// A lazily-evaluated query backed by a `LogicalPlan`.
8#[derive(Debug, Clone)]
9pub struct LazyFrame {
10    plan: LogicalPlan,
11}
12
13impl LazyFrame {
14    /// Create a `LazyFrame` that scans an in-memory `DataFrame`.
15    pub fn from_dataframe(df: DataFrame) -> Self {
16        Self {
17            plan: LogicalPlan::DataFrameScan { df },
18        }
19    }
20
21    /// Build a CSV scan plan (no file I/O is performed until `collect()`).
22    pub fn scan_csv(path: impl AsRef<Path>) -> Result<Self> {
23        Ok(Self {
24            plan: LogicalPlan::CsvScan {
25                path: path.as_ref().to_path_buf(),
26                predicate: None,
27                projection: None,
28            },
29        })
30    }
31
32    /// Build a Parquet scan plan (no file I/O is performed until `collect()`).
33    pub fn scan_parquet(path: impl AsRef<Path>) -> Result<Self> {
34        Ok(Self {
35            plan: LogicalPlan::ParquetScan {
36                path: path.as_ref().to_path_buf(),
37                predicate: None,
38                projection: None,
39            },
40        })
41    }
42
43    /// Add a projection (`select`) node to the logical plan.
44    pub fn select(self, exprs: Vec<Expr>) -> Self {
45        Self {
46            plan: LogicalPlan::Projection {
47                input: Box::new(self.plan),
48                exprs,
49                kind: ProjectionKind::Select,
50            },
51        }
52    }
53
54    /// Add a filter node to the logical plan.
55    pub fn filter(self, predicate: Expr) -> Self {
56        Self {
57            plan: LogicalPlan::Filter {
58                input: Box::new(self.plan),
59                predicate,
60            },
61        }
62    }
63
64    /// Add a projection (`with_columns`) node to the logical plan.
65    pub fn with_columns(self, exprs: Vec<Expr>) -> Self {
66        Self {
67            plan: LogicalPlan::Projection {
68                input: Box::new(self.plan),
69                exprs,
70                kind: ProjectionKind::WithColumns,
71            },
72        }
73    }
74
75    /// Start a group-by on this `LazyFrame`.
76    pub fn group_by(self, by: Vec<Expr>) -> LazyGroupBy {
77        LazyGroupBy {
78            plan: self.plan,
79            by,
80        }
81    }
82
83    /// Join with another `LazyFrame` using provided join keys.
84    pub fn join<K: Into<JoinKeys>>(self, other: LazyFrame, keys: K, how: JoinType) -> Self {
85        let keys = keys.into();
86        Self {
87            plan: LogicalPlan::Join {
88                left: Box::new(self.plan),
89                right: Box::new(other.plan),
90                keys,
91                how,
92            },
93        }
94    }
95
96    /// Sort by one or more columns.
97    pub fn sort(self, mut options: SortOptions) -> Self {
98        options.nulls_last = true;
99        options.stable = true;
100        Self {
101            plan: LogicalPlan::Sort {
102                input: Box::new(self.plan),
103                options,
104            },
105        }
106    }
107
108    /// Return the first `n` rows.
109    pub fn head(self, n: usize) -> Self {
110        Self {
111            plan: LogicalPlan::Slice {
112                input: Box::new(self.plan),
113                offset: 0,
114                len: n,
115                from_end: false,
116            },
117        }
118    }
119
120    /// Return the last `n` rows.
121    pub fn tail(self, n: usize) -> Self {
122        Self {
123            plan: LogicalPlan::Slice {
124                input: Box::new(self.plan),
125                offset: 0,
126                len: n,
127                from_end: true,
128            },
129        }
130    }
131
132    /// Remove duplicate rows.
133    pub fn unique(self, subset: Option<Vec<String>>) -> Self {
134        Self {
135            plan: LogicalPlan::Unique {
136                input: Box::new(self.plan),
137                subset,
138            },
139        }
140    }
141
142    /// Fill null values using a scalar or strategy.
143    pub fn fill_null<T: Into<FillNull>>(self, fill: T) -> Self {
144        Self {
145            plan: LogicalPlan::FillNull {
146                input: Box::new(self.plan),
147                fill: fill.into(),
148            },
149        }
150    }
151
152    /// Drop rows containing null values.
153    pub fn drop_nulls(self, subset: Option<Vec<String>>) -> Self {
154        Self {
155            plan: LogicalPlan::DropNulls {
156                input: Box::new(self.plan),
157                subset,
158            },
159        }
160    }
161
162    /// Count null values per column.
163    pub fn null_count(self) -> Self {
164        Self {
165            plan: LogicalPlan::NullCount {
166                input: Box::new(self.plan),
167            },
168        }
169    }
170
171    /// Optimize, compile, and execute this `LazyFrame` into an eager `DataFrame`.
172    pub fn collect(self) -> Result<DataFrame> {
173        let optimized = Optimizer::optimize(&self.plan);
174        let physical = crate::physical::compile(&optimized)?;
175        let batches = crate::physical::Executor::execute(physical)?;
176        DataFrame::from_batches(batches)
177    }
178
179    /// Render the logical plan as a human-readable string.
180    ///
181    /// If `optimized` is `true`, includes optimizer rewrites such as pushdowns.
182    pub fn explain(self, optimized: bool) -> String {
183        if optimized {
184            Optimizer::optimize(&self.plan).display()
185        } else {
186            self.plan.display()
187        }
188    }
189}
190
191/// Group-by builder for `LazyFrame`.
192#[derive(Debug, Clone)]
193pub struct LazyGroupBy {
194    by: Vec<Expr>,
195    plan: LogicalPlan,
196}
197
198impl LazyGroupBy {
199    /// Add an aggregate node to the logical plan.
200    pub fn agg(self, aggs: Vec<Expr>) -> LazyFrame {
201        LazyFrame {
202            plan: LogicalPlan::Aggregate {
203                input: Box::new(self.plan),
204                group_by: self.by,
205                aggs,
206            },
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use std::sync::Arc;
214
215    use arrow::array::{ArrayRef, Int64Array};
216
217    use super::LazyFrame;
218    use crate::expr::{col, lit};
219    use crate::{DataFrame, Series};
220
221    fn df() -> DataFrame {
222        let a: ArrayRef = Arc::new(Int64Array::from(vec![1, 2, 3]));
223        let b: ArrayRef = Arc::new(Int64Array::from(vec![10, 20, 30]));
224        DataFrame::new(vec![
225            Series::from_arrow("a", vec![a]).unwrap(),
226            Series::from_arrow("b", vec![b]).unwrap(),
227        ])
228        .unwrap()
229    }
230
231    #[test]
232    fn explain_builds_plan_without_io() {
233        let lf = LazyFrame::scan_csv("test.csv").unwrap();
234        let s = lf.explain(false);
235        assert!(s.contains("scan[csv"));
236    }
237
238    #[test]
239    fn collect_executes_filter_and_select_on_dataframe_scan() {
240        let lf = LazyFrame::from_dataframe(df())
241            .filter(col("a").gt(lit(1_i64)))
242            .select(vec![col("b").alias("bb")]);
243        let out = lf.collect().unwrap();
244        assert_eq!(out.height(), 2);
245        let bb = out.column("bb").unwrap();
246        assert_eq!(bb.len(), 2);
247    }
248
249    #[test]
250    fn group_by_agg_executes_sum_and_count() {
251        let lf = LazyFrame::from_dataframe(df())
252            .group_by(vec![col("a")])
253            .agg(vec![
254                col("b").sum().alias("sum_b"),
255                col("b").count().alias("cnt_b"),
256            ]);
257        let out = lf.collect().unwrap();
258        assert_eq!(out.width(), 3);
259        assert_eq!(out.column("a").unwrap().len(), 3);
260        assert_eq!(out.column("sum_b").unwrap().len(), 3);
261        assert_eq!(out.column("cnt_b").unwrap().len(), 3);
262    }
263}