Struct LazyFrame

Source
pub struct LazyFrame { /* private fields */ }
Expand description

DataFrame wrapper for lazy evaluation

Implementations§

Source§

impl LazyFrame

Source

pub fn new(df: OptimizedDataFrame) -> Self

Create a new LazyFrame

Examples found in repository?
examples/optimized_stats_example.rs (line 182)
115fn optimized_regression_example() -> Result<()> {
116    println!("3. Example of Regression Analysis with Optimized Implementation");
117    println!("--------------------------");
118
119    // Create an optimized DataFrame
120    let mut opt_df = OptimizedDataFrame::new();
121
122    // Add explanatory variables as type-specialized columns
123    let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124    let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126    opt_df.add_column(
127        "x1".to_string(),
128        pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129    )?;
130    opt_df.add_column(
131        "x2".to_string(),
132        pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133    )?;
134
135    // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136    let mut y_values = Vec::with_capacity(10);
137    let mut rng = rng();
138
139    for i in 0..10 {
140        let noise = rng.random_range(-1.0..1.0);
141        let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142        y_values.push(y_val);
143    }
144
145    opt_df.add_column(
146        "y".to_string(),
147        pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148    )?;
149
150    // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151    let mut df = pandrs::DataFrame::new();
152
153    // Add x1, x2, y columns to the regular DataFrame
154    df.add_column(
155        "x1".to_string(),
156        pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157    )?;
158    df.add_column(
159        "x2".to_string(),
160        pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161    )?;
162    df.add_column(
163        "y".to_string(),
164        pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165    )?;
166
167    // Perform regression analysis
168    let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170    // Display results
171    println!(
172        "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173        model.intercept, model.coefficients[0], model.coefficients[1]
174    );
175    println!("R²: {:.4}", model.r_squared);
176    println!("Adjusted R²: {:.4}", model.adj_r_squared);
177    println!("p-values of regression coefficients: {:?}", model.p_values);
178
179    // Example using LazyFrame (leveraging lazy evaluation)
180    println!("\nOperations with LazyFrame:");
181
182    let lazy_df = LazyFrame::new(opt_df);
183
184    // Perform column selection and filtering by condition at once
185    // These operations are not executed until actually needed
186    let filtered = lazy_df
187        .select(&["x1", "x2", "y"])
188        // Filter condition for LazyFrame is specified as a simple string expression
189        .filter("x1 > 5.0")
190        .execute()?;
191
192    println!("Number of rows after filtering: {}", filtered.row_count());
193
194    // Example of simple regression
195    println!("\nSimple Regression Model (x1 only):");
196    let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197    println!(
198        "Linear Regression Model: y = {:.4} + {:.4} × x1",
199        model_simple.intercept, model_simple.coefficients[0]
200    );
201    println!("R²: {:.4}", model_simple.r_squared);
202
203    Ok(())
204}
More examples
Hide additional examples
examples/lazy_parallel_example.rs (line 120)
8fn main() -> Result<(), Box<dyn Error>> {
9    println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11    // Generate a large DataFrame
12    println!("Generating a large dataset...");
13    let rows = 100_000;
14    let df = generate_large_dataframe(rows)?;
15    println!("Generated {} rows of data", rows);
16
17    // Display a portion of the DataFrame
18    println!("\nFirst row of data:");
19    println!("{:?}\n", df);
20
21    // Filtering and aggregation with the standard approach
22    println!("Executing data processing with the standard approach...");
23    let start = Instant::now();
24
25    // Create an age filter (30 years and older)
26    let age_col = df.column("Age")?;
27    let mut age_filter = vec![false; df.row_count()];
28    if let Some(int_col) = age_col.as_int64() {
29        for i in 0..df.row_count() {
30            if let Ok(Some(age)) = int_col.get(i) {
31                age_filter[i] = age >= 30;
32            }
33        }
34    }
35
36    // Add the filter to the DataFrame
37    let bool_data = pandrs::BooleanColumn::new(age_filter);
38    let mut df_with_filter = df.clone();
39    df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41    // Execute filtering
42    let filtered_df = df_with_filter.filter("30 and older")?;
43
44    // Manually aggregate by department
45    let dept_col = filtered_df.column("Department")?;
46    let salary_col = filtered_df.column("Salary")?;
47
48    // Aggregation by department
49    let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50        std::collections::HashMap::new();
51
52    if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53        for i in 0..filtered_df.row_count() {
54            if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55                let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56                entry.0 += salary;
57                entry.1 += 1;
58            }
59        }
60    }
61
62    // Construct the result
63    let mut result_depts = Vec::new();
64    let mut result_totals = Vec::new();
65    let mut result_avgs = Vec::new();
66    let mut result_counts = Vec::new();
67
68    for (dept, (total, count)) in dept_totals {
69        result_depts.push(dept);
70        result_totals.push(total);
71        result_avgs.push(total / count as f64);
72        result_counts.push(count as f64);
73    }
74
75    // Create the result DataFrame
76    let mut result_df = OptimizedDataFrame::new();
77    result_df.add_column(
78        "Department",
79        Column::String(StringColumn::new(result_depts)),
80    )?;
81    result_df.add_column(
82        "Total Salary",
83        Column::Float64(Float64Column::new(result_totals)),
84    )?;
85    result_df.add_column(
86        "Average Salary",
87        Column::Float64(Float64Column::new(result_avgs)),
88    )?;
89    result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91    let standard_duration = start.elapsed();
92    println!(
93        "Processing time with the standard approach: {:?}",
94        standard_duration
95    );
96    println!("\nResults of the standard approach:");
97    println!("{:?}\n", result_df);
98
99    // Approach using LazyFrame and parallel processing
100    println!("Approach using LazyFrame and parallel processing...");
101    let start = Instant::now();
102
103    // First, create a boolean column for the filter
104    let age_col = df.column("Age")?;
105    let mut age_filter = vec![false; df.row_count()];
106    if let Some(int_col) = age_col.as_int64() {
107        for i in 0..df.row_count() {
108            if let Ok(Some(age)) = int_col.get(i) {
109                age_filter[i] = age >= 30;
110            }
111        }
112    }
113
114    // Add the filter to the DataFrame
115    let mut df_with_age_filter = df.clone();
116    let bool_data = pandrs::BooleanColumn::new(age_filter);
117    df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119    // Recreate the LazyFrame
120    let lazy_df = LazyFrame::new(df_with_age_filter);
121    let result_lazy = lazy_df
122        .filter("Age Filter") // Use the newly added boolean column for filtering
123        .aggregate(
124            vec!["Department".to_string()],
125            vec![
126                ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127                ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128                ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129            ]
130        );
131
132    // Display the execution plan
133    println!("\nExecution Plan:");
134    println!("{}", result_lazy.explain());
135
136    // Execute
137    let lazy_result = result_lazy.execute()?;
138
139    let lazy_duration = start.elapsed();
140    println!(
141        "Processing time with the LazyFrame approach: {:?}",
142        lazy_duration
143    );
144    println!("\nResults of the LazyFrame approach:");
145    println!("{:?}\n", lazy_result);
146
147    // Performance comparison
148    let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149    println!(
150        "The LazyFrame approach is {:.2} times faster than the standard approach",
151        speedup
152    );
153
154    Ok(())
155}
examples/parallel_benchmark.rs (line 121)
7fn main() {
8    println!("Parallel Processing Performance Benchmark");
9    println!("============================");
10
11    // Data size
12    const ROWS: usize = 1_000_000;
13
14    // ====================
15    // Create Large DataFrame
16    // ====================
17    println!("\n[1] Create Large DataFrame ({} rows)", ROWS);
18
19    // --- Data Generation ---
20    let data_gen_start = Instant::now();
21
22    let mut int_data = Vec::with_capacity(ROWS);
23    let mut float_data = Vec::with_capacity(ROWS);
24    let mut str_data = Vec::with_capacity(ROWS);
25    let mut bool_data = Vec::with_capacity(ROWS);
26
27    for i in 0..ROWS {
28        int_data.push(i as i64);
29        float_data.push(i as f64 / 100.0);
30        str_data.push(format!("value_{}", i % 1000)); // Limited string set
31        bool_data.push(i % 2 == 0);
32    }
33
34    let data_gen_time = data_gen_start.elapsed();
35    println!("Data generation time: {:?}", data_gen_time);
36
37    // --- DataFrame Construction ---
38    let df_construct_start = Instant::now();
39    let mut df = OptimizedDataFrame::new();
40
41    df.add_column("id".to_string(), Column::Int64(Int64Column::new(int_data)))
42        .unwrap();
43    df.add_column(
44        "value".to_string(),
45        Column::Float64(Float64Column::new(float_data)),
46    )
47    .unwrap();
48    df.add_column(
49        "category".to_string(),
50        Column::String(StringColumn::new(str_data)),
51    )
52    .unwrap();
53    df.add_column(
54        "flag".to_string(),
55        Column::Boolean(BooleanColumn::new(bool_data)),
56    )
57    .unwrap();
58
59    let df_construct_time = df_construct_start.elapsed();
60    println!("DataFrame construction time: {:?}", df_construct_time);
61    println!(
62        "Total DataFrame creation time: {:?}",
63        data_gen_time + df_construct_time
64    );
65
66    // ====================
67    // Serial vs Parallel Filtering
68    // ====================
69    println!("\n[2] Filtering Performance (id > 500000)");
70
71    // Add a condition column
72    let condition_data: Vec<bool> = (0..ROWS).map(|i| i > ROWS / 2).collect();
73    df.add_column(
74        "filter_condition".to_string(),
75        Column::Boolean(BooleanColumn::new(condition_data)),
76    )
77    .unwrap();
78
79    // Serial filtering
80    let mut serial_total = std::time::Duration::new(0, 0);
81    for _ in 0..3 {
82        let start = Instant::now();
83        let filtered_df = df.filter("filter_condition").unwrap();
84        let duration = start.elapsed();
85        serial_total += duration;
86        println!("Serial filtering time (1 run): {:?}", duration);
87        println!("Filtered row count: {}", filtered_df.row_count());
88    }
89    let serial_time = serial_total / 3;
90    println!("Average serial filtering time: {:?}", serial_time);
91
92    // Parallel filtering
93    let mut parallel_total = std::time::Duration::new(0, 0);
94    for _ in 0..3 {
95        let start = Instant::now();
96        let par_filtered_df = df.par_filter("filter_condition").unwrap();
97        let duration = start.elapsed();
98        parallel_total += duration;
99        println!("Parallel filtering time (1 run): {:?}", duration);
100        println!("Filtered row count: {}", par_filtered_df.row_count());
101    }
102    let parallel_time = parallel_total / 3;
103    println!("Average parallel filtering time: {:?}", parallel_time);
104
105    println!(
106        "Speedup: {:.2}x",
107        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
108    );
109
110    // ====================
111    // Grouping and Aggregation
112    // ====================
113    println!("\n[3] Grouping and Aggregation (average by category)");
114
115    // Serial grouping and aggregation
116    let small_df = df.select(&["category", "value"]).unwrap();
117
118    let mut serial_total = std::time::Duration::new(0, 0);
119    for _ in 0..3 {
120        let start = Instant::now();
121        let lazy_df = LazyFrame::new(small_df.clone());
122        let grouped_df = lazy_df
123            .aggregate(
124                vec!["category".to_string()],
125                vec![(
126                    "value".to_string(),
127                    AggregateOp::Mean,
128                    "value_mean".to_string(),
129                )],
130            )
131            .execute()
132            .unwrap();
133        let duration = start.elapsed();
134        serial_total += duration;
135        println!(
136            "Serial grouping and aggregation time (1 run): {:?}",
137            duration
138        );
139        println!("Group count: {}", grouped_df.row_count());
140    }
141    let serial_time = serial_total / 3;
142    println!(
143        "Average serial grouping and aggregation time: {:?}",
144        serial_time
145    );
146
147    // Parallel grouping and aggregation
148    let mut parallel_total = std::time::Duration::new(0, 0);
149    for _ in 0..3 {
150        let start = Instant::now();
151        let grouped_map = small_df.par_groupby(&["category"]).unwrap();
152
153        let mut result_df = OptimizedDataFrame::new();
154        let mut categories = Vec::with_capacity(grouped_map.len());
155        let mut means = Vec::with_capacity(grouped_map.len());
156
157        for (category, group_df) in &grouped_map {
158            categories.push(category.clone());
159
160            let value_col = group_df.column("value").unwrap();
161            if let Some(float_col) = value_col.as_float64() {
162                let mut sum = 0.0;
163                let mut count = 0;
164
165                for i in 0..float_col.len() {
166                    if let Ok(Some(val)) = float_col.get(i) {
167                        sum += val;
168                        count += 1;
169                    }
170                }
171
172                let mean = if count > 0 { sum / count as f64 } else { 0.0 };
173                means.push(mean);
174            }
175        }
176
177        result_df
178            .add_column(
179                "category".to_string(),
180                Column::String(StringColumn::new(categories)),
181            )
182            .unwrap();
183        result_df
184            .add_column(
185                "value_mean".to_string(),
186                Column::Float64(Float64Column::new(means)),
187            )
188            .unwrap();
189
190        let duration = start.elapsed();
191        parallel_total += duration;
192        println!(
193            "Parallel grouping and aggregation time (1 run): {:?}",
194            duration
195        );
196        println!("Group count: {}", result_df.row_count());
197    }
198    let parallel_time = parallel_total / 3;
199    println!(
200        "Average parallel grouping and aggregation time: {:?}",
201        parallel_time
202    );
203
204    println!(
205        "Speedup: {:.2}x",
206        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
207    );
208
209    // ====================
210    // Computation (double all values)
211    // ====================
212    println!("\n[4] Computation (double the values in the 'value' column)");
213
214    // Serial computation
215    let start = Instant::now();
216    let mut computed_df = OptimizedDataFrame::new();
217
218    for name in df.column_names() {
219        let col_view = df.column(name).unwrap();
220
221        let new_col = if name == "value" {
222            let float_col = col_view.as_float64().unwrap();
223            let mut doubled_values = Vec::with_capacity(float_col.len());
224
225            for i in 0..float_col.len() {
226                if let Ok(Some(val)) = float_col.get(i) {
227                    doubled_values.push(val * 2.0);
228                } else {
229                    doubled_values.push(0.0);
230                }
231            }
232
233            Column::Float64(Float64Column::new(doubled_values))
234        } else {
235            col_view.into_column()
236        };
237
238        computed_df.add_column(name.to_string(), new_col).unwrap();
239    }
240
241    let serial_time = start.elapsed();
242    println!("Serial computation time: {:?}", serial_time);
243
244    // Parallel computation
245    let start = Instant::now();
246    let _par_computed_df = df
247        .par_apply(|view| {
248            if view.as_float64().is_some() {
249                if let Some(float_col) = view.as_float64() {
250                    use rayon::prelude::*;
251
252                    let values = (0..float_col.len())
253                        .into_par_iter()
254                        .map(|i| {
255                            if let Ok(Some(val)) = float_col.get(i) {
256                                val * 2.0
257                            } else {
258                                0.0
259                            }
260                        })
261                        .collect::<Vec<_>>();
262
263                    Ok(Column::Float64(Float64Column::new(values)))
264                } else {
265                    Ok(view.clone().into_column())
266                }
267            } else {
268                Ok(view.clone().into_column())
269            }
270        })
271        .unwrap();
272
273    let parallel_time = start.elapsed();
274    println!("Parallel computation time: {:?}", parallel_time);
275
276    println!(
277        "Speedup: {:.2}x",
278        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
279    );
280
281    println!("\nParallel Benchmark Complete");
282}
Source

pub fn select(self, columns: &[&str]) -> Self

Select columns

Examples found in repository?
examples/optimized_stats_example.rs (line 187)
115fn optimized_regression_example() -> Result<()> {
116    println!("3. Example of Regression Analysis with Optimized Implementation");
117    println!("--------------------------");
118
119    // Create an optimized DataFrame
120    let mut opt_df = OptimizedDataFrame::new();
121
122    // Add explanatory variables as type-specialized columns
123    let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124    let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126    opt_df.add_column(
127        "x1".to_string(),
128        pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129    )?;
130    opt_df.add_column(
131        "x2".to_string(),
132        pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133    )?;
134
135    // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136    let mut y_values = Vec::with_capacity(10);
137    let mut rng = rng();
138
139    for i in 0..10 {
140        let noise = rng.random_range(-1.0..1.0);
141        let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142        y_values.push(y_val);
143    }
144
145    opt_df.add_column(
146        "y".to_string(),
147        pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148    )?;
149
150    // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151    let mut df = pandrs::DataFrame::new();
152
153    // Add x1, x2, y columns to the regular DataFrame
154    df.add_column(
155        "x1".to_string(),
156        pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157    )?;
158    df.add_column(
159        "x2".to_string(),
160        pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161    )?;
162    df.add_column(
163        "y".to_string(),
164        pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165    )?;
166
167    // Perform regression analysis
168    let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170    // Display results
171    println!(
172        "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173        model.intercept, model.coefficients[0], model.coefficients[1]
174    );
175    println!("R²: {:.4}", model.r_squared);
176    println!("Adjusted R²: {:.4}", model.adj_r_squared);
177    println!("p-values of regression coefficients: {:?}", model.p_values);
178
179    // Example using LazyFrame (leveraging lazy evaluation)
180    println!("\nOperations with LazyFrame:");
181
182    let lazy_df = LazyFrame::new(opt_df);
183
184    // Perform column selection and filtering by condition at once
185    // These operations are not executed until actually needed
186    let filtered = lazy_df
187        .select(&["x1", "x2", "y"])
188        // Filter condition for LazyFrame is specified as a simple string expression
189        .filter("x1 > 5.0")
190        .execute()?;
191
192    println!("Number of rows after filtering: {}", filtered.row_count());
193
194    // Example of simple regression
195    println!("\nSimple Regression Model (x1 only):");
196    let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197    println!(
198        "Linear Regression Model: y = {:.4} + {:.4} × x1",
199        model_simple.intercept, model_simple.coefficients[0]
200    );
201    println!("R²: {:.4}", model_simple.r_squared);
202
203    Ok(())
204}
Source

pub fn filter(self, condition: &str) -> Self

Filter data

Examples found in repository?
examples/optimized_stats_example.rs (line 189)
115fn optimized_regression_example() -> Result<()> {
116    println!("3. Example of Regression Analysis with Optimized Implementation");
117    println!("--------------------------");
118
119    // Create an optimized DataFrame
120    let mut opt_df = OptimizedDataFrame::new();
121
122    // Add explanatory variables as type-specialized columns
123    let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124    let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126    opt_df.add_column(
127        "x1".to_string(),
128        pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129    )?;
130    opt_df.add_column(
131        "x2".to_string(),
132        pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133    )?;
134
135    // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136    let mut y_values = Vec::with_capacity(10);
137    let mut rng = rng();
138
139    for i in 0..10 {
140        let noise = rng.random_range(-1.0..1.0);
141        let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142        y_values.push(y_val);
143    }
144
145    opt_df.add_column(
146        "y".to_string(),
147        pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148    )?;
149
150    // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151    let mut df = pandrs::DataFrame::new();
152
153    // Add x1, x2, y columns to the regular DataFrame
154    df.add_column(
155        "x1".to_string(),
156        pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157    )?;
158    df.add_column(
159        "x2".to_string(),
160        pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161    )?;
162    df.add_column(
163        "y".to_string(),
164        pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165    )?;
166
167    // Perform regression analysis
168    let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170    // Display results
171    println!(
172        "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173        model.intercept, model.coefficients[0], model.coefficients[1]
174    );
175    println!("R²: {:.4}", model.r_squared);
176    println!("Adjusted R²: {:.4}", model.adj_r_squared);
177    println!("p-values of regression coefficients: {:?}", model.p_values);
178
179    // Example using LazyFrame (leveraging lazy evaluation)
180    println!("\nOperations with LazyFrame:");
181
182    let lazy_df = LazyFrame::new(opt_df);
183
184    // Perform column selection and filtering by condition at once
185    // These operations are not executed until actually needed
186    let filtered = lazy_df
187        .select(&["x1", "x2", "y"])
188        // Filter condition for LazyFrame is specified as a simple string expression
189        .filter("x1 > 5.0")
190        .execute()?;
191
192    println!("Number of rows after filtering: {}", filtered.row_count());
193
194    // Example of simple regression
195    println!("\nSimple Regression Model (x1 only):");
196    let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197    println!(
198        "Linear Regression Model: y = {:.4} + {:.4} × x1",
199        model_simple.intercept, model_simple.coefficients[0]
200    );
201    println!("R²: {:.4}", model_simple.r_squared);
202
203    Ok(())
204}
More examples
Hide additional examples
examples/lazy_parallel_example.rs (line 122)
8fn main() -> Result<(), Box<dyn Error>> {
9    println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11    // Generate a large DataFrame
12    println!("Generating a large dataset...");
13    let rows = 100_000;
14    let df = generate_large_dataframe(rows)?;
15    println!("Generated {} rows of data", rows);
16
17    // Display a portion of the DataFrame
18    println!("\nFirst row of data:");
19    println!("{:?}\n", df);
20
21    // Filtering and aggregation with the standard approach
22    println!("Executing data processing with the standard approach...");
23    let start = Instant::now();
24
25    // Create an age filter (30 years and older)
26    let age_col = df.column("Age")?;
27    let mut age_filter = vec![false; df.row_count()];
28    if let Some(int_col) = age_col.as_int64() {
29        for i in 0..df.row_count() {
30            if let Ok(Some(age)) = int_col.get(i) {
31                age_filter[i] = age >= 30;
32            }
33        }
34    }
35
36    // Add the filter to the DataFrame
37    let bool_data = pandrs::BooleanColumn::new(age_filter);
38    let mut df_with_filter = df.clone();
39    df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41    // Execute filtering
42    let filtered_df = df_with_filter.filter("30 and older")?;
43
44    // Manually aggregate by department
45    let dept_col = filtered_df.column("Department")?;
46    let salary_col = filtered_df.column("Salary")?;
47
48    // Aggregation by department
49    let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50        std::collections::HashMap::new();
51
52    if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53        for i in 0..filtered_df.row_count() {
54            if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55                let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56                entry.0 += salary;
57                entry.1 += 1;
58            }
59        }
60    }
61
62    // Construct the result
63    let mut result_depts = Vec::new();
64    let mut result_totals = Vec::new();
65    let mut result_avgs = Vec::new();
66    let mut result_counts = Vec::new();
67
68    for (dept, (total, count)) in dept_totals {
69        result_depts.push(dept);
70        result_totals.push(total);
71        result_avgs.push(total / count as f64);
72        result_counts.push(count as f64);
73    }
74
75    // Create the result DataFrame
76    let mut result_df = OptimizedDataFrame::new();
77    result_df.add_column(
78        "Department",
79        Column::String(StringColumn::new(result_depts)),
80    )?;
81    result_df.add_column(
82        "Total Salary",
83        Column::Float64(Float64Column::new(result_totals)),
84    )?;
85    result_df.add_column(
86        "Average Salary",
87        Column::Float64(Float64Column::new(result_avgs)),
88    )?;
89    result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91    let standard_duration = start.elapsed();
92    println!(
93        "Processing time with the standard approach: {:?}",
94        standard_duration
95    );
96    println!("\nResults of the standard approach:");
97    println!("{:?}\n", result_df);
98
99    // Approach using LazyFrame and parallel processing
100    println!("Approach using LazyFrame and parallel processing...");
101    let start = Instant::now();
102
103    // First, create a boolean column for the filter
104    let age_col = df.column("Age")?;
105    let mut age_filter = vec![false; df.row_count()];
106    if let Some(int_col) = age_col.as_int64() {
107        for i in 0..df.row_count() {
108            if let Ok(Some(age)) = int_col.get(i) {
109                age_filter[i] = age >= 30;
110            }
111        }
112    }
113
114    // Add the filter to the DataFrame
115    let mut df_with_age_filter = df.clone();
116    let bool_data = pandrs::BooleanColumn::new(age_filter);
117    df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119    // Recreate the LazyFrame
120    let lazy_df = LazyFrame::new(df_with_age_filter);
121    let result_lazy = lazy_df
122        .filter("Age Filter") // Use the newly added boolean column for filtering
123        .aggregate(
124            vec!["Department".to_string()],
125            vec![
126                ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127                ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128                ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129            ]
130        );
131
132    // Display the execution plan
133    println!("\nExecution Plan:");
134    println!("{}", result_lazy.explain());
135
136    // Execute
137    let lazy_result = result_lazy.execute()?;
138
139    let lazy_duration = start.elapsed();
140    println!(
141        "Processing time with the LazyFrame approach: {:?}",
142        lazy_duration
143    );
144    println!("\nResults of the LazyFrame approach:");
145    println!("{:?}\n", lazy_result);
146
147    // Performance comparison
148    let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149    println!(
150        "The LazyFrame approach is {:.2} times faster than the standard approach",
151        speedup
152    );
153
154    Ok(())
155}
Source

pub fn map<F>(self, f: F) -> Self
where F: Fn(&Column) -> Result<Column> + Send + Sync + 'static,

Apply mapping function

Source

pub fn aggregate<I, J>(self, group_by: I, aggregations: J) -> Self
where I: IntoIterator<Item = String>, J: IntoIterator<Item = (String, AggregateOp, String)>,

Perform aggregation

Examples found in repository?
examples/lazy_parallel_example.rs (lines 123-130)
8fn main() -> Result<(), Box<dyn Error>> {
9    println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11    // Generate a large DataFrame
12    println!("Generating a large dataset...");
13    let rows = 100_000;
14    let df = generate_large_dataframe(rows)?;
15    println!("Generated {} rows of data", rows);
16
17    // Display a portion of the DataFrame
18    println!("\nFirst row of data:");
19    println!("{:?}\n", df);
20
21    // Filtering and aggregation with the standard approach
22    println!("Executing data processing with the standard approach...");
23    let start = Instant::now();
24
25    // Create an age filter (30 years and older)
26    let age_col = df.column("Age")?;
27    let mut age_filter = vec![false; df.row_count()];
28    if let Some(int_col) = age_col.as_int64() {
29        for i in 0..df.row_count() {
30            if let Ok(Some(age)) = int_col.get(i) {
31                age_filter[i] = age >= 30;
32            }
33        }
34    }
35
36    // Add the filter to the DataFrame
37    let bool_data = pandrs::BooleanColumn::new(age_filter);
38    let mut df_with_filter = df.clone();
39    df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41    // Execute filtering
42    let filtered_df = df_with_filter.filter("30 and older")?;
43
44    // Manually aggregate by department
45    let dept_col = filtered_df.column("Department")?;
46    let salary_col = filtered_df.column("Salary")?;
47
48    // Aggregation by department
49    let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50        std::collections::HashMap::new();
51
52    if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53        for i in 0..filtered_df.row_count() {
54            if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55                let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56                entry.0 += salary;
57                entry.1 += 1;
58            }
59        }
60    }
61
62    // Construct the result
63    let mut result_depts = Vec::new();
64    let mut result_totals = Vec::new();
65    let mut result_avgs = Vec::new();
66    let mut result_counts = Vec::new();
67
68    for (dept, (total, count)) in dept_totals {
69        result_depts.push(dept);
70        result_totals.push(total);
71        result_avgs.push(total / count as f64);
72        result_counts.push(count as f64);
73    }
74
75    // Create the result DataFrame
76    let mut result_df = OptimizedDataFrame::new();
77    result_df.add_column(
78        "Department",
79        Column::String(StringColumn::new(result_depts)),
80    )?;
81    result_df.add_column(
82        "Total Salary",
83        Column::Float64(Float64Column::new(result_totals)),
84    )?;
85    result_df.add_column(
86        "Average Salary",
87        Column::Float64(Float64Column::new(result_avgs)),
88    )?;
89    result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91    let standard_duration = start.elapsed();
92    println!(
93        "Processing time with the standard approach: {:?}",
94        standard_duration
95    );
96    println!("\nResults of the standard approach:");
97    println!("{:?}\n", result_df);
98
99    // Approach using LazyFrame and parallel processing
100    println!("Approach using LazyFrame and parallel processing...");
101    let start = Instant::now();
102
103    // First, create a boolean column for the filter
104    let age_col = df.column("Age")?;
105    let mut age_filter = vec![false; df.row_count()];
106    if let Some(int_col) = age_col.as_int64() {
107        for i in 0..df.row_count() {
108            if let Ok(Some(age)) = int_col.get(i) {
109                age_filter[i] = age >= 30;
110            }
111        }
112    }
113
114    // Add the filter to the DataFrame
115    let mut df_with_age_filter = df.clone();
116    let bool_data = pandrs::BooleanColumn::new(age_filter);
117    df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119    // Recreate the LazyFrame
120    let lazy_df = LazyFrame::new(df_with_age_filter);
121    let result_lazy = lazy_df
122        .filter("Age Filter") // Use the newly added boolean column for filtering
123        .aggregate(
124            vec!["Department".to_string()],
125            vec![
126                ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127                ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128                ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129            ]
130        );
131
132    // Display the execution plan
133    println!("\nExecution Plan:");
134    println!("{}", result_lazy.explain());
135
136    // Execute
137    let lazy_result = result_lazy.execute()?;
138
139    let lazy_duration = start.elapsed();
140    println!(
141        "Processing time with the LazyFrame approach: {:?}",
142        lazy_duration
143    );
144    println!("\nResults of the LazyFrame approach:");
145    println!("{:?}\n", lazy_result);
146
147    // Performance comparison
148    let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149    println!(
150        "The LazyFrame approach is {:.2} times faster than the standard approach",
151        speedup
152    );
153
154    Ok(())
155}
More examples
Hide additional examples
examples/parallel_benchmark.rs (lines 123-130)
7fn main() {
8    println!("Parallel Processing Performance Benchmark");
9    println!("============================");
10
11    // Data size
12    const ROWS: usize = 1_000_000;
13
14    // ====================
15    // Create Large DataFrame
16    // ====================
17    println!("\n[1] Create Large DataFrame ({} rows)", ROWS);
18
19    // --- Data Generation ---
20    let data_gen_start = Instant::now();
21
22    let mut int_data = Vec::with_capacity(ROWS);
23    let mut float_data = Vec::with_capacity(ROWS);
24    let mut str_data = Vec::with_capacity(ROWS);
25    let mut bool_data = Vec::with_capacity(ROWS);
26
27    for i in 0..ROWS {
28        int_data.push(i as i64);
29        float_data.push(i as f64 / 100.0);
30        str_data.push(format!("value_{}", i % 1000)); // Limited string set
31        bool_data.push(i % 2 == 0);
32    }
33
34    let data_gen_time = data_gen_start.elapsed();
35    println!("Data generation time: {:?}", data_gen_time);
36
37    // --- DataFrame Construction ---
38    let df_construct_start = Instant::now();
39    let mut df = OptimizedDataFrame::new();
40
41    df.add_column("id".to_string(), Column::Int64(Int64Column::new(int_data)))
42        .unwrap();
43    df.add_column(
44        "value".to_string(),
45        Column::Float64(Float64Column::new(float_data)),
46    )
47    .unwrap();
48    df.add_column(
49        "category".to_string(),
50        Column::String(StringColumn::new(str_data)),
51    )
52    .unwrap();
53    df.add_column(
54        "flag".to_string(),
55        Column::Boolean(BooleanColumn::new(bool_data)),
56    )
57    .unwrap();
58
59    let df_construct_time = df_construct_start.elapsed();
60    println!("DataFrame construction time: {:?}", df_construct_time);
61    println!(
62        "Total DataFrame creation time: {:?}",
63        data_gen_time + df_construct_time
64    );
65
66    // ====================
67    // Serial vs Parallel Filtering
68    // ====================
69    println!("\n[2] Filtering Performance (id > 500000)");
70
71    // Add a condition column
72    let condition_data: Vec<bool> = (0..ROWS).map(|i| i > ROWS / 2).collect();
73    df.add_column(
74        "filter_condition".to_string(),
75        Column::Boolean(BooleanColumn::new(condition_data)),
76    )
77    .unwrap();
78
79    // Serial filtering
80    let mut serial_total = std::time::Duration::new(0, 0);
81    for _ in 0..3 {
82        let start = Instant::now();
83        let filtered_df = df.filter("filter_condition").unwrap();
84        let duration = start.elapsed();
85        serial_total += duration;
86        println!("Serial filtering time (1 run): {:?}", duration);
87        println!("Filtered row count: {}", filtered_df.row_count());
88    }
89    let serial_time = serial_total / 3;
90    println!("Average serial filtering time: {:?}", serial_time);
91
92    // Parallel filtering
93    let mut parallel_total = std::time::Duration::new(0, 0);
94    for _ in 0..3 {
95        let start = Instant::now();
96        let par_filtered_df = df.par_filter("filter_condition").unwrap();
97        let duration = start.elapsed();
98        parallel_total += duration;
99        println!("Parallel filtering time (1 run): {:?}", duration);
100        println!("Filtered row count: {}", par_filtered_df.row_count());
101    }
102    let parallel_time = parallel_total / 3;
103    println!("Average parallel filtering time: {:?}", parallel_time);
104
105    println!(
106        "Speedup: {:.2}x",
107        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
108    );
109
110    // ====================
111    // Grouping and Aggregation
112    // ====================
113    println!("\n[3] Grouping and Aggregation (average by category)");
114
115    // Serial grouping and aggregation
116    let small_df = df.select(&["category", "value"]).unwrap();
117
118    let mut serial_total = std::time::Duration::new(0, 0);
119    for _ in 0..3 {
120        let start = Instant::now();
121        let lazy_df = LazyFrame::new(small_df.clone());
122        let grouped_df = lazy_df
123            .aggregate(
124                vec!["category".to_string()],
125                vec![(
126                    "value".to_string(),
127                    AggregateOp::Mean,
128                    "value_mean".to_string(),
129                )],
130            )
131            .execute()
132            .unwrap();
133        let duration = start.elapsed();
134        serial_total += duration;
135        println!(
136            "Serial grouping and aggregation time (1 run): {:?}",
137            duration
138        );
139        println!("Group count: {}", grouped_df.row_count());
140    }
141    let serial_time = serial_total / 3;
142    println!(
143        "Average serial grouping and aggregation time: {:?}",
144        serial_time
145    );
146
147    // Parallel grouping and aggregation
148    let mut parallel_total = std::time::Duration::new(0, 0);
149    for _ in 0..3 {
150        let start = Instant::now();
151        let grouped_map = small_df.par_groupby(&["category"]).unwrap();
152
153        let mut result_df = OptimizedDataFrame::new();
154        let mut categories = Vec::with_capacity(grouped_map.len());
155        let mut means = Vec::with_capacity(grouped_map.len());
156
157        for (category, group_df) in &grouped_map {
158            categories.push(category.clone());
159
160            let value_col = group_df.column("value").unwrap();
161            if let Some(float_col) = value_col.as_float64() {
162                let mut sum = 0.0;
163                let mut count = 0;
164
165                for i in 0..float_col.len() {
166                    if let Ok(Some(val)) = float_col.get(i) {
167                        sum += val;
168                        count += 1;
169                    }
170                }
171
172                let mean = if count > 0 { sum / count as f64 } else { 0.0 };
173                means.push(mean);
174            }
175        }
176
177        result_df
178            .add_column(
179                "category".to_string(),
180                Column::String(StringColumn::new(categories)),
181            )
182            .unwrap();
183        result_df
184            .add_column(
185                "value_mean".to_string(),
186                Column::Float64(Float64Column::new(means)),
187            )
188            .unwrap();
189
190        let duration = start.elapsed();
191        parallel_total += duration;
192        println!(
193            "Parallel grouping and aggregation time (1 run): {:?}",
194            duration
195        );
196        println!("Group count: {}", result_df.row_count());
197    }
198    let parallel_time = parallel_total / 3;
199    println!(
200        "Average parallel grouping and aggregation time: {:?}",
201        parallel_time
202    );
203
204    println!(
205        "Speedup: {:.2}x",
206        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
207    );
208
209    // ====================
210    // Computation (double all values)
211    // ====================
212    println!("\n[4] Computation (double the values in the 'value' column)");
213
214    // Serial computation
215    let start = Instant::now();
216    let mut computed_df = OptimizedDataFrame::new();
217
218    for name in df.column_names() {
219        let col_view = df.column(name).unwrap();
220
221        let new_col = if name == "value" {
222            let float_col = col_view.as_float64().unwrap();
223            let mut doubled_values = Vec::with_capacity(float_col.len());
224
225            for i in 0..float_col.len() {
226                if let Ok(Some(val)) = float_col.get(i) {
227                    doubled_values.push(val * 2.0);
228                } else {
229                    doubled_values.push(0.0);
230                }
231            }
232
233            Column::Float64(Float64Column::new(doubled_values))
234        } else {
235            col_view.into_column()
236        };
237
238        computed_df.add_column(name.to_string(), new_col).unwrap();
239    }
240
241    let serial_time = start.elapsed();
242    println!("Serial computation time: {:?}", serial_time);
243
244    // Parallel computation
245    let start = Instant::now();
246    let _par_computed_df = df
247        .par_apply(|view| {
248            if view.as_float64().is_some() {
249                if let Some(float_col) = view.as_float64() {
250                    use rayon::prelude::*;
251
252                    let values = (0..float_col.len())
253                        .into_par_iter()
254                        .map(|i| {
255                            if let Ok(Some(val)) = float_col.get(i) {
256                                val * 2.0
257                            } else {
258                                0.0
259                            }
260                        })
261                        .collect::<Vec<_>>();
262
263                    Ok(Column::Float64(Float64Column::new(values)))
264                } else {
265                    Ok(view.clone().into_column())
266                }
267            } else {
268                Ok(view.clone().into_column())
269            }
270        })
271        .unwrap();
272
273    let parallel_time = start.elapsed();
274    println!("Parallel computation time: {:?}", parallel_time);
275
276    println!(
277        "Speedup: {:.2}x",
278        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
279    );
280
281    println!("\nParallel Benchmark Complete");
282}
Source

pub fn join( self, right: OptimizedDataFrame, left_on: &str, right_on: &str, join_type: JoinType, ) -> Self

Join operation

Source

pub fn sort(self, by: &str, ascending: bool) -> Self

Sort data

Source

pub fn execute(self) -> Result<OptimizedDataFrame>

Execute computation graph and get results

Examples found in repository?
examples/optimized_stats_example.rs (line 190)
115fn optimized_regression_example() -> Result<()> {
116    println!("3. Example of Regression Analysis with Optimized Implementation");
117    println!("--------------------------");
118
119    // Create an optimized DataFrame
120    let mut opt_df = OptimizedDataFrame::new();
121
122    // Add explanatory variables as type-specialized columns
123    let x1: Vec<f64> = (1..=10).map(|i| i as f64).collect();
124    let x2: Vec<f64> = (1..=10).map(|i| 5.0 + 3.0 * i as f64).collect();
125
126    opt_df.add_column(
127        "x1".to_string(),
128        pandrs::Column::Float64(pandrs::column::Float64Column::new(x1.clone())),
129    )?;
130    opt_df.add_column(
131        "x2".to_string(),
132        pandrs::Column::Float64(pandrs::column::Float64Column::new(x2.clone())),
133    )?;
134
135    // Dependent variable (y = 2*x1 + 1.5*x2 + 3 + noise)
136    let mut y_values = Vec::with_capacity(10);
137    let mut rng = rng();
138
139    for i in 0..10 {
140        let noise = rng.random_range(-1.0..1.0);
141        let y_val = 2.0 * x1[i] + 1.5 * x2[i] + 3.0 + noise;
142        y_values.push(y_val);
143    }
144
145    opt_df.add_column(
146        "y".to_string(),
147        pandrs::Column::Float64(pandrs::column::Float64Column::new(y_values.clone())),
148    )?;
149
150    // Convert to a regular DataFrame (to match the interface of regression analysis functions)
151    let mut df = pandrs::DataFrame::new();
152
153    // Add x1, x2, y columns to the regular DataFrame
154    df.add_column(
155        "x1".to_string(),
156        pandrs::Series::new(x1.clone(), Some("x1".to_string()))?,
157    )?;
158    df.add_column(
159        "x2".to_string(),
160        pandrs::Series::new(x2.clone(), Some("x2".to_string()))?,
161    )?;
162    df.add_column(
163        "y".to_string(),
164        pandrs::Series::new(y_values.clone(), Some("y".to_string()))?,
165    )?;
166
167    // Perform regression analysis
168    let model = pandrs::stats::linear_regression(&df, "y", &["x1", "x2"])?;
169
170    // Display results
171    println!(
172        "Linear Regression Model: y = {:.4} + {:.4} × x1 + {:.4} × x2",
173        model.intercept, model.coefficients[0], model.coefficients[1]
174    );
175    println!("R²: {:.4}", model.r_squared);
176    println!("Adjusted R²: {:.4}", model.adj_r_squared);
177    println!("p-values of regression coefficients: {:?}", model.p_values);
178
179    // Example using LazyFrame (leveraging lazy evaluation)
180    println!("\nOperations with LazyFrame:");
181
182    let lazy_df = LazyFrame::new(opt_df);
183
184    // Perform column selection and filtering by condition at once
185    // These operations are not executed until actually needed
186    let filtered = lazy_df
187        .select(&["x1", "x2", "y"])
188        // Filter condition for LazyFrame is specified as a simple string expression
189        .filter("x1 > 5.0")
190        .execute()?;
191
192    println!("Number of rows after filtering: {}", filtered.row_count());
193
194    // Example of simple regression
195    println!("\nSimple Regression Model (x1 only):");
196    let model_simple = pandrs::stats::linear_regression(&df, "y", &["x1"])?;
197    println!(
198        "Linear Regression Model: y = {:.4} + {:.4} × x1",
199        model_simple.intercept, model_simple.coefficients[0]
200    );
201    println!("R²: {:.4}", model_simple.r_squared);
202
203    Ok(())
204}
More examples
Hide additional examples
examples/lazy_parallel_example.rs (line 137)
8fn main() -> Result<(), Box<dyn Error>> {
9    println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11    // Generate a large DataFrame
12    println!("Generating a large dataset...");
13    let rows = 100_000;
14    let df = generate_large_dataframe(rows)?;
15    println!("Generated {} rows of data", rows);
16
17    // Display a portion of the DataFrame
18    println!("\nFirst row of data:");
19    println!("{:?}\n", df);
20
21    // Filtering and aggregation with the standard approach
22    println!("Executing data processing with the standard approach...");
23    let start = Instant::now();
24
25    // Create an age filter (30 years and older)
26    let age_col = df.column("Age")?;
27    let mut age_filter = vec![false; df.row_count()];
28    if let Some(int_col) = age_col.as_int64() {
29        for i in 0..df.row_count() {
30            if let Ok(Some(age)) = int_col.get(i) {
31                age_filter[i] = age >= 30;
32            }
33        }
34    }
35
36    // Add the filter to the DataFrame
37    let bool_data = pandrs::BooleanColumn::new(age_filter);
38    let mut df_with_filter = df.clone();
39    df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41    // Execute filtering
42    let filtered_df = df_with_filter.filter("30 and older")?;
43
44    // Manually aggregate by department
45    let dept_col = filtered_df.column("Department")?;
46    let salary_col = filtered_df.column("Salary")?;
47
48    // Aggregation by department
49    let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50        std::collections::HashMap::new();
51
52    if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53        for i in 0..filtered_df.row_count() {
54            if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55                let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56                entry.0 += salary;
57                entry.1 += 1;
58            }
59        }
60    }
61
62    // Construct the result
63    let mut result_depts = Vec::new();
64    let mut result_totals = Vec::new();
65    let mut result_avgs = Vec::new();
66    let mut result_counts = Vec::new();
67
68    for (dept, (total, count)) in dept_totals {
69        result_depts.push(dept);
70        result_totals.push(total);
71        result_avgs.push(total / count as f64);
72        result_counts.push(count as f64);
73    }
74
75    // Create the result DataFrame
76    let mut result_df = OptimizedDataFrame::new();
77    result_df.add_column(
78        "Department",
79        Column::String(StringColumn::new(result_depts)),
80    )?;
81    result_df.add_column(
82        "Total Salary",
83        Column::Float64(Float64Column::new(result_totals)),
84    )?;
85    result_df.add_column(
86        "Average Salary",
87        Column::Float64(Float64Column::new(result_avgs)),
88    )?;
89    result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91    let standard_duration = start.elapsed();
92    println!(
93        "Processing time with the standard approach: {:?}",
94        standard_duration
95    );
96    println!("\nResults of the standard approach:");
97    println!("{:?}\n", result_df);
98
99    // Approach using LazyFrame and parallel processing
100    println!("Approach using LazyFrame and parallel processing...");
101    let start = Instant::now();
102
103    // First, create a boolean column for the filter
104    let age_col = df.column("Age")?;
105    let mut age_filter = vec![false; df.row_count()];
106    if let Some(int_col) = age_col.as_int64() {
107        for i in 0..df.row_count() {
108            if let Ok(Some(age)) = int_col.get(i) {
109                age_filter[i] = age >= 30;
110            }
111        }
112    }
113
114    // Add the filter to the DataFrame
115    let mut df_with_age_filter = df.clone();
116    let bool_data = pandrs::BooleanColumn::new(age_filter);
117    df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119    // Recreate the LazyFrame
120    let lazy_df = LazyFrame::new(df_with_age_filter);
121    let result_lazy = lazy_df
122        .filter("Age Filter") // Use the newly added boolean column for filtering
123        .aggregate(
124            vec!["Department".to_string()],
125            vec![
126                ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127                ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128                ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129            ]
130        );
131
132    // Display the execution plan
133    println!("\nExecution Plan:");
134    println!("{}", result_lazy.explain());
135
136    // Execute
137    let lazy_result = result_lazy.execute()?;
138
139    let lazy_duration = start.elapsed();
140    println!(
141        "Processing time with the LazyFrame approach: {:?}",
142        lazy_duration
143    );
144    println!("\nResults of the LazyFrame approach:");
145    println!("{:?}\n", lazy_result);
146
147    // Performance comparison
148    let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149    println!(
150        "The LazyFrame approach is {:.2} times faster than the standard approach",
151        speedup
152    );
153
154    Ok(())
155}
examples/parallel_benchmark.rs (line 131)
7fn main() {
8    println!("Parallel Processing Performance Benchmark");
9    println!("============================");
10
11    // Data size
12    const ROWS: usize = 1_000_000;
13
14    // ====================
15    // Create Large DataFrame
16    // ====================
17    println!("\n[1] Create Large DataFrame ({} rows)", ROWS);
18
19    // --- Data Generation ---
20    let data_gen_start = Instant::now();
21
22    let mut int_data = Vec::with_capacity(ROWS);
23    let mut float_data = Vec::with_capacity(ROWS);
24    let mut str_data = Vec::with_capacity(ROWS);
25    let mut bool_data = Vec::with_capacity(ROWS);
26
27    for i in 0..ROWS {
28        int_data.push(i as i64);
29        float_data.push(i as f64 / 100.0);
30        str_data.push(format!("value_{}", i % 1000)); // Limited string set
31        bool_data.push(i % 2 == 0);
32    }
33
34    let data_gen_time = data_gen_start.elapsed();
35    println!("Data generation time: {:?}", data_gen_time);
36
37    // --- DataFrame Construction ---
38    let df_construct_start = Instant::now();
39    let mut df = OptimizedDataFrame::new();
40
41    df.add_column("id".to_string(), Column::Int64(Int64Column::new(int_data)))
42        .unwrap();
43    df.add_column(
44        "value".to_string(),
45        Column::Float64(Float64Column::new(float_data)),
46    )
47    .unwrap();
48    df.add_column(
49        "category".to_string(),
50        Column::String(StringColumn::new(str_data)),
51    )
52    .unwrap();
53    df.add_column(
54        "flag".to_string(),
55        Column::Boolean(BooleanColumn::new(bool_data)),
56    )
57    .unwrap();
58
59    let df_construct_time = df_construct_start.elapsed();
60    println!("DataFrame construction time: {:?}", df_construct_time);
61    println!(
62        "Total DataFrame creation time: {:?}",
63        data_gen_time + df_construct_time
64    );
65
66    // ====================
67    // Serial vs Parallel Filtering
68    // ====================
69    println!("\n[2] Filtering Performance (id > 500000)");
70
71    // Add a condition column
72    let condition_data: Vec<bool> = (0..ROWS).map(|i| i > ROWS / 2).collect();
73    df.add_column(
74        "filter_condition".to_string(),
75        Column::Boolean(BooleanColumn::new(condition_data)),
76    )
77    .unwrap();
78
79    // Serial filtering
80    let mut serial_total = std::time::Duration::new(0, 0);
81    for _ in 0..3 {
82        let start = Instant::now();
83        let filtered_df = df.filter("filter_condition").unwrap();
84        let duration = start.elapsed();
85        serial_total += duration;
86        println!("Serial filtering time (1 run): {:?}", duration);
87        println!("Filtered row count: {}", filtered_df.row_count());
88    }
89    let serial_time = serial_total / 3;
90    println!("Average serial filtering time: {:?}", serial_time);
91
92    // Parallel filtering
93    let mut parallel_total = std::time::Duration::new(0, 0);
94    for _ in 0..3 {
95        let start = Instant::now();
96        let par_filtered_df = df.par_filter("filter_condition").unwrap();
97        let duration = start.elapsed();
98        parallel_total += duration;
99        println!("Parallel filtering time (1 run): {:?}", duration);
100        println!("Filtered row count: {}", par_filtered_df.row_count());
101    }
102    let parallel_time = parallel_total / 3;
103    println!("Average parallel filtering time: {:?}", parallel_time);
104
105    println!(
106        "Speedup: {:.2}x",
107        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
108    );
109
110    // ====================
111    // Grouping and Aggregation
112    // ====================
113    println!("\n[3] Grouping and Aggregation (average by category)");
114
115    // Serial grouping and aggregation
116    let small_df = df.select(&["category", "value"]).unwrap();
117
118    let mut serial_total = std::time::Duration::new(0, 0);
119    for _ in 0..3 {
120        let start = Instant::now();
121        let lazy_df = LazyFrame::new(small_df.clone());
122        let grouped_df = lazy_df
123            .aggregate(
124                vec!["category".to_string()],
125                vec![(
126                    "value".to_string(),
127                    AggregateOp::Mean,
128                    "value_mean".to_string(),
129                )],
130            )
131            .execute()
132            .unwrap();
133        let duration = start.elapsed();
134        serial_total += duration;
135        println!(
136            "Serial grouping and aggregation time (1 run): {:?}",
137            duration
138        );
139        println!("Group count: {}", grouped_df.row_count());
140    }
141    let serial_time = serial_total / 3;
142    println!(
143        "Average serial grouping and aggregation time: {:?}",
144        serial_time
145    );
146
147    // Parallel grouping and aggregation
148    let mut parallel_total = std::time::Duration::new(0, 0);
149    for _ in 0..3 {
150        let start = Instant::now();
151        let grouped_map = small_df.par_groupby(&["category"]).unwrap();
152
153        let mut result_df = OptimizedDataFrame::new();
154        let mut categories = Vec::with_capacity(grouped_map.len());
155        let mut means = Vec::with_capacity(grouped_map.len());
156
157        for (category, group_df) in &grouped_map {
158            categories.push(category.clone());
159
160            let value_col = group_df.column("value").unwrap();
161            if let Some(float_col) = value_col.as_float64() {
162                let mut sum = 0.0;
163                let mut count = 0;
164
165                for i in 0..float_col.len() {
166                    if let Ok(Some(val)) = float_col.get(i) {
167                        sum += val;
168                        count += 1;
169                    }
170                }
171
172                let mean = if count > 0 { sum / count as f64 } else { 0.0 };
173                means.push(mean);
174            }
175        }
176
177        result_df
178            .add_column(
179                "category".to_string(),
180                Column::String(StringColumn::new(categories)),
181            )
182            .unwrap();
183        result_df
184            .add_column(
185                "value_mean".to_string(),
186                Column::Float64(Float64Column::new(means)),
187            )
188            .unwrap();
189
190        let duration = start.elapsed();
191        parallel_total += duration;
192        println!(
193            "Parallel grouping and aggregation time (1 run): {:?}",
194            duration
195        );
196        println!("Group count: {}", result_df.row_count());
197    }
198    let parallel_time = parallel_total / 3;
199    println!(
200        "Average parallel grouping and aggregation time: {:?}",
201        parallel_time
202    );
203
204    println!(
205        "Speedup: {:.2}x",
206        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
207    );
208
209    // ====================
210    // Computation (double all values)
211    // ====================
212    println!("\n[4] Computation (double the values in the 'value' column)");
213
214    // Serial computation
215    let start = Instant::now();
216    let mut computed_df = OptimizedDataFrame::new();
217
218    for name in df.column_names() {
219        let col_view = df.column(name).unwrap();
220
221        let new_col = if name == "value" {
222            let float_col = col_view.as_float64().unwrap();
223            let mut doubled_values = Vec::with_capacity(float_col.len());
224
225            for i in 0..float_col.len() {
226                if let Ok(Some(val)) = float_col.get(i) {
227                    doubled_values.push(val * 2.0);
228                } else {
229                    doubled_values.push(0.0);
230                }
231            }
232
233            Column::Float64(Float64Column::new(doubled_values))
234        } else {
235            col_view.into_column()
236        };
237
238        computed_df.add_column(name.to_string(), new_col).unwrap();
239    }
240
241    let serial_time = start.elapsed();
242    println!("Serial computation time: {:?}", serial_time);
243
244    // Parallel computation
245    let start = Instant::now();
246    let _par_computed_df = df
247        .par_apply(|view| {
248            if view.as_float64().is_some() {
249                if let Some(float_col) = view.as_float64() {
250                    use rayon::prelude::*;
251
252                    let values = (0..float_col.len())
253                        .into_par_iter()
254                        .map(|i| {
255                            if let Ok(Some(val)) = float_col.get(i) {
256                                val * 2.0
257                            } else {
258                                0.0
259                            }
260                        })
261                        .collect::<Vec<_>>();
262
263                    Ok(Column::Float64(Float64Column::new(values)))
264                } else {
265                    Ok(view.clone().into_column())
266                }
267            } else {
268                Ok(view.clone().into_column())
269            }
270        })
271        .unwrap();
272
273    let parallel_time = start.elapsed();
274    println!("Parallel computation time: {:?}", parallel_time);
275
276    println!(
277        "Speedup: {:.2}x",
278        serial_time.as_secs_f64() / parallel_time.as_secs_f64()
279    );
280
281    println!("\nParallel Benchmark Complete");
282}
Source

pub fn explain(&self) -> String

Display optimized computation graph

Examples found in repository?
examples/lazy_parallel_example.rs (line 134)
8fn main() -> Result<(), Box<dyn Error>> {
9    println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11    // Generate a large DataFrame
12    println!("Generating a large dataset...");
13    let rows = 100_000;
14    let df = generate_large_dataframe(rows)?;
15    println!("Generated {} rows of data", rows);
16
17    // Display a portion of the DataFrame
18    println!("\nFirst row of data:");
19    println!("{:?}\n", df);
20
21    // Filtering and aggregation with the standard approach
22    println!("Executing data processing with the standard approach...");
23    let start = Instant::now();
24
25    // Create an age filter (30 years and older)
26    let age_col = df.column("Age")?;
27    let mut age_filter = vec![false; df.row_count()];
28    if let Some(int_col) = age_col.as_int64() {
29        for i in 0..df.row_count() {
30            if let Ok(Some(age)) = int_col.get(i) {
31                age_filter[i] = age >= 30;
32            }
33        }
34    }
35
36    // Add the filter to the DataFrame
37    let bool_data = pandrs::BooleanColumn::new(age_filter);
38    let mut df_with_filter = df.clone();
39    df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41    // Execute filtering
42    let filtered_df = df_with_filter.filter("30 and older")?;
43
44    // Manually aggregate by department
45    let dept_col = filtered_df.column("Department")?;
46    let salary_col = filtered_df.column("Salary")?;
47
48    // Aggregation by department
49    let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50        std::collections::HashMap::new();
51
52    if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53        for i in 0..filtered_df.row_count() {
54            if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55                let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56                entry.0 += salary;
57                entry.1 += 1;
58            }
59        }
60    }
61
62    // Construct the result
63    let mut result_depts = Vec::new();
64    let mut result_totals = Vec::new();
65    let mut result_avgs = Vec::new();
66    let mut result_counts = Vec::new();
67
68    for (dept, (total, count)) in dept_totals {
69        result_depts.push(dept);
70        result_totals.push(total);
71        result_avgs.push(total / count as f64);
72        result_counts.push(count as f64);
73    }
74
75    // Create the result DataFrame
76    let mut result_df = OptimizedDataFrame::new();
77    result_df.add_column(
78        "Department",
79        Column::String(StringColumn::new(result_depts)),
80    )?;
81    result_df.add_column(
82        "Total Salary",
83        Column::Float64(Float64Column::new(result_totals)),
84    )?;
85    result_df.add_column(
86        "Average Salary",
87        Column::Float64(Float64Column::new(result_avgs)),
88    )?;
89    result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91    let standard_duration = start.elapsed();
92    println!(
93        "Processing time with the standard approach: {:?}",
94        standard_duration
95    );
96    println!("\nResults of the standard approach:");
97    println!("{:?}\n", result_df);
98
99    // Approach using LazyFrame and parallel processing
100    println!("Approach using LazyFrame and parallel processing...");
101    let start = Instant::now();
102
103    // First, create a boolean column for the filter
104    let age_col = df.column("Age")?;
105    let mut age_filter = vec![false; df.row_count()];
106    if let Some(int_col) = age_col.as_int64() {
107        for i in 0..df.row_count() {
108            if let Ok(Some(age)) = int_col.get(i) {
109                age_filter[i] = age >= 30;
110            }
111        }
112    }
113
114    // Add the filter to the DataFrame
115    let mut df_with_age_filter = df.clone();
116    let bool_data = pandrs::BooleanColumn::new(age_filter);
117    df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119    // Recreate the LazyFrame
120    let lazy_df = LazyFrame::new(df_with_age_filter);
121    let result_lazy = lazy_df
122        .filter("Age Filter") // Use the newly added boolean column for filtering
123        .aggregate(
124            vec!["Department".to_string()],
125            vec![
126                ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127                ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128                ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129            ]
130        );
131
132    // Display the execution plan
133    println!("\nExecution Plan:");
134    println!("{}", result_lazy.explain());
135
136    // Execute
137    let lazy_result = result_lazy.execute()?;
138
139    let lazy_duration = start.elapsed();
140    println!(
141        "Processing time with the LazyFrame approach: {:?}",
142        lazy_duration
143    );
144    println!("\nResults of the LazyFrame approach:");
145    println!("{:?}\n", lazy_result);
146
147    // Performance comparison
148    let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149    println!(
150        "The LazyFrame approach is {:.2} times faster than the standard approach",
151        speedup
152    );
153
154    Ok(())
155}

Trait Implementations§

Source§

impl Clone for LazyFrame

Source§

fn clone(&self) -> LazyFrame

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V