lazy_parallel_example/
lazy_parallel_example.rs

1use std::error::Error;
2use std::time::Instant;
3
4use pandrs::{
5    AggregateOp, Column, Float64Column, Int64Column, LazyFrame, OptimizedDataFrame, StringColumn,
6};
7
8fn main() -> Result<(), Box<dyn Error>> {
9    println!("Performance Evaluation with Lazy Evaluation and Parallel Processing\n");
10
11    // Generate a large DataFrame
12    println!("Generating a large dataset...");
13    let rows = 100_000;
14    let df = generate_large_dataframe(rows)?;
15    println!("Generated {} rows of data", rows);
16
17    // Display a portion of the DataFrame
18    println!("\nFirst row of data:");
19    println!("{:?}\n", df);
20
21    // Filtering and aggregation with the standard approach
22    println!("Executing data processing with the standard approach...");
23    let start = Instant::now();
24
25    // Create an age filter (30 years and older)
26    let age_col = df.column("Age")?;
27    let mut age_filter = vec![false; df.row_count()];
28    if let Some(int_col) = age_col.as_int64() {
29        for i in 0..df.row_count() {
30            if let Ok(Some(age)) = int_col.get(i) {
31                age_filter[i] = age >= 30;
32            }
33        }
34    }
35
36    // Add the filter to the DataFrame
37    let bool_data = pandrs::BooleanColumn::new(age_filter);
38    let mut df_with_filter = df.clone();
39    df_with_filter.add_column("30 and older", Column::Boolean(bool_data))?;
40
41    // Execute filtering
42    let filtered_df = df_with_filter.filter("30 and older")?;
43
44    // Manually aggregate by department
45    let dept_col = filtered_df.column("Department")?;
46    let salary_col = filtered_df.column("Salary")?;
47
48    // Aggregation by department
49    let mut dept_totals: std::collections::HashMap<String, (f64, i32)> =
50        std::collections::HashMap::new();
51
52    if let (Some(str_col), Some(float_col)) = (dept_col.as_string(), salary_col.as_float64()) {
53        for i in 0..filtered_df.row_count() {
54            if let (Ok(Some(dept)), Ok(Some(salary))) = (str_col.get(i), float_col.get(i)) {
55                let entry = dept_totals.entry(dept.to_string()).or_insert((0.0, 0));
56                entry.0 += salary;
57                entry.1 += 1;
58            }
59        }
60    }
61
62    // Construct the result
63    let mut result_depts = Vec::new();
64    let mut result_totals = Vec::new();
65    let mut result_avgs = Vec::new();
66    let mut result_counts = Vec::new();
67
68    for (dept, (total, count)) in dept_totals {
69        result_depts.push(dept);
70        result_totals.push(total);
71        result_avgs.push(total / count as f64);
72        result_counts.push(count as f64);
73    }
74
75    // Create the result DataFrame
76    let mut result_df = OptimizedDataFrame::new();
77    result_df.add_column(
78        "Department",
79        Column::String(StringColumn::new(result_depts)),
80    )?;
81    result_df.add_column(
82        "Total Salary",
83        Column::Float64(Float64Column::new(result_totals)),
84    )?;
85    result_df.add_column(
86        "Average Salary",
87        Column::Float64(Float64Column::new(result_avgs)),
88    )?;
89    result_df.add_column("Count", Column::Float64(Float64Column::new(result_counts)))?;
90
91    let standard_duration = start.elapsed();
92    println!(
93        "Processing time with the standard approach: {:?}",
94        standard_duration
95    );
96    println!("\nResults of the standard approach:");
97    println!("{:?}\n", result_df);
98
99    // Approach using LazyFrame and parallel processing
100    println!("Approach using LazyFrame and parallel processing...");
101    let start = Instant::now();
102
103    // First, create a boolean column for the filter
104    let age_col = df.column("Age")?;
105    let mut age_filter = vec![false; df.row_count()];
106    if let Some(int_col) = age_col.as_int64() {
107        for i in 0..df.row_count() {
108            if let Ok(Some(age)) = int_col.get(i) {
109                age_filter[i] = age >= 30;
110            }
111        }
112    }
113
114    // Add the filter to the DataFrame
115    let mut df_with_age_filter = df.clone();
116    let bool_data = pandrs::BooleanColumn::new(age_filter);
117    df_with_age_filter.add_column("Age Filter", Column::Boolean(bool_data))?;
118
119    // Recreate the LazyFrame
120    let lazy_df = LazyFrame::new(df_with_age_filter);
121    let result_lazy = lazy_df
122        .filter("Age Filter") // Use the newly added boolean column for filtering
123        .aggregate(
124            vec!["Department".to_string()],
125            vec![
126                ("Salary".to_string(), AggregateOp::Sum, "Total Salary".to_string()),
127                ("Salary".to_string(), AggregateOp::Mean, "Average Salary".to_string()),
128                ("Salary".to_string(), AggregateOp::Count, "Count".to_string()),
129            ]
130        );
131
132    // Display the execution plan
133    println!("\nExecution Plan:");
134    println!("{}", result_lazy.explain());
135
136    // Execute
137    let lazy_result = result_lazy.execute()?;
138
139    let lazy_duration = start.elapsed();
140    println!(
141        "Processing time with the LazyFrame approach: {:?}",
142        lazy_duration
143    );
144    println!("\nResults of the LazyFrame approach:");
145    println!("{:?}\n", lazy_result);
146
147    // Performance comparison
148    let speedup = standard_duration.as_secs_f64() / lazy_duration.as_secs_f64();
149    println!(
150        "The LazyFrame approach is {:.2} times faster than the standard approach",
151        speedup
152    );
153
154    Ok(())
155}
156
157// Function to generate a large DataFrame
158fn generate_large_dataframe(rows: usize) -> Result<OptimizedDataFrame, Box<dyn Error>> {
159    use rand::rngs::StdRng;
160    use rand::{Rng, SeedableRng};
161
162    let mut rng = StdRng::seed_from_u64(42); // Fixed seed for reproducibility
163
164    // Generate data
165    let mut ids = Vec::with_capacity(rows);
166    let mut ages = Vec::with_capacity(rows);
167    let mut depts = Vec::with_capacity(rows);
168    let mut salaries = Vec::with_capacity(rows);
169
170    // List of departments
171    let departments = vec![
172        "Sales".to_string(),
173        "Development".to_string(),
174        "HR".to_string(),
175        "Finance".to_string(),
176        "Marketing".to_string(),
177    ];
178
179    for i in 0..rows {
180        ids.push(i as i64 + 1000); // ID
181        ages.push(rng.random_range(20..60)); // Age
182        depts.push(departments[rng.random_range(0..departments.len())].clone()); // Department
183
184        // Salary (generated based on department and age)
185        let base_salary = match depts.last().unwrap().as_str() {
186            "Sales" => 350_000.0,
187            "Development" => 400_000.0,
188            "HR" => 320_000.0,
189            "Finance" => 380_000.0,
190            "Marketing" => 360_000.0,
191            _ => 300_000.0,
192        };
193
194        let age_factor = *ages.last().unwrap() as f64 / 30.0;
195        let variation = rng.random_range(0.8..1.2);
196
197        salaries.push(base_salary * age_factor * variation);
198    }
199
200    // Create the DataFrame
201    let mut df = OptimizedDataFrame::new();
202    df.add_column("ID", Column::Int64(Int64Column::new(ids)))?;
203    df.add_column("Age", Column::Int64(Int64Column::new(ages)))?;
204    df.add_column("Department", Column::String(StringColumn::new(depts)))?;
205    df.add_column("Salary", Column::Float64(Float64Column::new(salaries)))?;
206
207    Ok(df)
208}