1use std::path::Path;
2
3use crate::lazy::{LogicalPlan, Optimizer, ProjectionKind};
4use crate::ops::{FillNull, JoinKeys, JoinType, SortOptions};
5use crate::{DataFrame, Expr, Result};
6
7#[derive(Debug, Clone)]
9pub struct LazyFrame {
10 plan: LogicalPlan,
11}
12
13impl LazyFrame {
14 pub fn from_dataframe(df: DataFrame) -> Self {
16 Self {
17 plan: LogicalPlan::DataFrameScan { df },
18 }
19 }
20
21 pub fn scan_csv(path: impl AsRef<Path>) -> Result<Self> {
23 Ok(Self {
24 plan: LogicalPlan::CsvScan {
25 path: path.as_ref().to_path_buf(),
26 predicate: None,
27 projection: None,
28 },
29 })
30 }
31
32 pub fn scan_parquet(path: impl AsRef<Path>) -> Result<Self> {
34 Ok(Self {
35 plan: LogicalPlan::ParquetScan {
36 path: path.as_ref().to_path_buf(),
37 predicate: None,
38 projection: None,
39 },
40 })
41 }
42
43 pub fn select(self, exprs: Vec<Expr>) -> Self {
45 Self {
46 plan: LogicalPlan::Projection {
47 input: Box::new(self.plan),
48 exprs,
49 kind: ProjectionKind::Select,
50 },
51 }
52 }
53
54 pub fn filter(self, predicate: Expr) -> Self {
56 Self {
57 plan: LogicalPlan::Filter {
58 input: Box::new(self.plan),
59 predicate,
60 },
61 }
62 }
63
64 pub fn with_columns(self, exprs: Vec<Expr>) -> Self {
66 Self {
67 plan: LogicalPlan::Projection {
68 input: Box::new(self.plan),
69 exprs,
70 kind: ProjectionKind::WithColumns,
71 },
72 }
73 }
74
75 pub fn group_by(self, by: Vec<Expr>) -> LazyGroupBy {
77 LazyGroupBy {
78 plan: self.plan,
79 by,
80 }
81 }
82
83 pub fn join<K: Into<JoinKeys>>(self, other: LazyFrame, keys: K, how: JoinType) -> Self {
85 let keys = keys.into();
86 Self {
87 plan: LogicalPlan::Join {
88 left: Box::new(self.plan),
89 right: Box::new(other.plan),
90 keys,
91 how,
92 },
93 }
94 }
95
96 pub fn sort(self, mut options: SortOptions) -> Self {
98 options.nulls_last = true;
99 options.stable = true;
100 Self {
101 plan: LogicalPlan::Sort {
102 input: Box::new(self.plan),
103 options,
104 },
105 }
106 }
107
108 pub fn head(self, n: usize) -> Self {
110 Self {
111 plan: LogicalPlan::Slice {
112 input: Box::new(self.plan),
113 offset: 0,
114 len: n,
115 from_end: false,
116 },
117 }
118 }
119
120 pub fn tail(self, n: usize) -> Self {
122 Self {
123 plan: LogicalPlan::Slice {
124 input: Box::new(self.plan),
125 offset: 0,
126 len: n,
127 from_end: true,
128 },
129 }
130 }
131
132 pub fn unique(self, subset: Option<Vec<String>>) -> Self {
134 Self {
135 plan: LogicalPlan::Unique {
136 input: Box::new(self.plan),
137 subset,
138 },
139 }
140 }
141
142 pub fn fill_null<T: Into<FillNull>>(self, fill: T) -> Self {
144 Self {
145 plan: LogicalPlan::FillNull {
146 input: Box::new(self.plan),
147 fill: fill.into(),
148 },
149 }
150 }
151
152 pub fn drop_nulls(self, subset: Option<Vec<String>>) -> Self {
154 Self {
155 plan: LogicalPlan::DropNulls {
156 input: Box::new(self.plan),
157 subset,
158 },
159 }
160 }
161
162 pub fn null_count(self) -> Self {
164 Self {
165 plan: LogicalPlan::NullCount {
166 input: Box::new(self.plan),
167 },
168 }
169 }
170
171 pub fn collect(self) -> Result<DataFrame> {
173 let optimized = Optimizer::optimize(&self.plan);
174 let physical = crate::physical::compile(&optimized)?;
175 let batches = crate::physical::Executor::execute(physical)?;
176 DataFrame::from_batches(batches)
177 }
178
179 pub fn explain(self, optimized: bool) -> String {
183 if optimized {
184 Optimizer::optimize(&self.plan).display()
185 } else {
186 self.plan.display()
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct LazyGroupBy {
194 by: Vec<Expr>,
195 plan: LogicalPlan,
196}
197
198impl LazyGroupBy {
199 pub fn agg(self, aggs: Vec<Expr>) -> LazyFrame {
201 LazyFrame {
202 plan: LogicalPlan::Aggregate {
203 input: Box::new(self.plan),
204 group_by: self.by,
205 aggs,
206 },
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use std::sync::Arc;
214
215 use arrow::array::{ArrayRef, Int64Array};
216
217 use super::LazyFrame;
218 use crate::expr::{col, lit};
219 use crate::{DataFrame, Series};
220
221 fn df() -> DataFrame {
222 let a: ArrayRef = Arc::new(Int64Array::from(vec![1, 2, 3]));
223 let b: ArrayRef = Arc::new(Int64Array::from(vec![10, 20, 30]));
224 DataFrame::new(vec![
225 Series::from_arrow("a", vec![a]).unwrap(),
226 Series::from_arrow("b", vec![b]).unwrap(),
227 ])
228 .unwrap()
229 }
230
231 #[test]
232 fn explain_builds_plan_without_io() {
233 let lf = LazyFrame::scan_csv("test.csv").unwrap();
234 let s = lf.explain(false);
235 assert!(s.contains("scan[csv"));
236 }
237
238 #[test]
239 fn collect_executes_filter_and_select_on_dataframe_scan() {
240 let lf = LazyFrame::from_dataframe(df())
241 .filter(col("a").gt(lit(1_i64)))
242 .select(vec![col("b").alias("bb")]);
243 let out = lf.collect().unwrap();
244 assert_eq!(out.height(), 2);
245 let bb = out.column("bb").unwrap();
246 assert_eq!(bb.len(), 2);
247 }
248
249 #[test]
250 fn group_by_agg_executes_sum_and_count() {
251 let lf = LazyFrame::from_dataframe(df())
252 .group_by(vec![col("a")])
253 .agg(vec![
254 col("b").sum().alias("sum_b"),
255 col("b").count().alias("cnt_b"),
256 ]);
257 let out = lf.collect().unwrap();
258 assert_eq!(out.width(), 3);
259 assert_eq!(out.column("a").unwrap().len(), 3);
260 assert_eq!(out.column("sum_b").unwrap().len(), 3);
261 assert_eq!(out.column("cnt_b").unwrap().len(), 3);
262 }
263}