1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
//! JIT support for GroupBy operations
//!
//! This module provides JIT compilation capabilities for GroupBy operations,
//! allowing for accelerated custom aggregations similar to pandas with Numba.
//!
//! Both single-threaded and parallel JIT functions are supported, with
//! parallel functions automatically used when appropriate based on data size.
use std::sync::Arc;
use crate::core::error::Result;
use crate::optimized::dataframe::OptimizedDataFrame;
use crate::optimized::split_dataframe::group::{AggregateOp, GroupBy};
use super::jit_core::{JitCompilable, JitFunction};
use super::parallel::{ParallelConfig, parallel_sum_f64, parallel_mean_f64,
parallel_std_f64, parallel_min_f64, parallel_max_f64};
/// Extension trait for JIT-enabled GroupBy operations
pub trait GroupByJitExt<'a> {
/// Aggregate using a JIT-compiled function
///
/// # Arguments
/// * `column` - The column name to aggregate
/// * `jit_fn` - The JIT-compilable function to apply
/// * `alias` - The name for the resulting column
///
/// # Returns
/// * `Result<OptimizedDataFrame>` - DataFrame containing aggregation results
fn aggregate_jit<J>(
&self,
column: &str,
jit_fn: J,
alias: &str,
) -> Result<OptimizedDataFrame>
where
J: JitCompilable<Vec<f64>, f64> + 'static;
/// Aggregate with sum using JIT compilation
fn sum_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Aggregate with mean using JIT compilation
fn mean_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Aggregate with standard deviation using JIT compilation
fn std_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Aggregate with variance using JIT compilation
fn var_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Aggregate with min using JIT compilation
fn min_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Aggregate with max using JIT compilation
fn max_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Aggregate with median using JIT compilation
fn median_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame>;
/// Apply multiple JIT-compiled aggregations at once
///
/// # Arguments
/// * `aggregations` - List of (column, JIT function, alias) tuples
///
/// # Returns
/// * `Result<OptimizedDataFrame>` - DataFrame containing aggregation results
fn aggregate_multi_jit<I, J>(
&self,
aggregations: I,
) -> Result<OptimizedDataFrame>
where
I: IntoIterator<Item = (String, J, String)>,
J: JitCompilable<Vec<f64>, f64> + 'static;
/// Aggregate with parallel sum using JIT compilation
///
/// This uses multi-threading for improved performance on large groups.
fn parallel_sum_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame>;
/// Aggregate with parallel mean using JIT compilation
///
/// This uses multi-threading for improved performance on large groups.
fn parallel_mean_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame>;
/// Aggregate with parallel standard deviation using JIT compilation
///
/// This uses multi-threading for improved performance on large groups.
fn parallel_std_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame>;
/// Aggregate with parallel min using JIT compilation
///
/// This uses multi-threading for improved performance on large groups.
fn parallel_min_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame>;
/// Aggregate with parallel max using JIT compilation
///
/// This uses multi-threading for improved performance on large groups.
fn parallel_max_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame>;
}
/// Implement the GroupByJitExt trait for GroupBy
impl<'a> GroupByJitExt<'a> for GroupBy<'a> {
fn aggregate_jit<J>(
&self,
column: &str,
jit_fn: J,
alias: &str,
) -> Result<OptimizedDataFrame>
where
J: JitCompilable<Vec<f64>, f64> + 'static,
{
// Create a custom aggregation that uses the JIT function
let jit_agg = CustomAggregation {
column: column.to_string(),
op: AggregateOp::Custom,
alias: alias.to_string(),
custom_fn: Some(Arc::new(move |values: &[f64]| -> f64 {
// Convert slice to Vec for the JIT function
let vec_values = values.to_vec();
jit_fn.execute(vec_values)
})),
};
// Use the existing aggregate_custom method
self.aggregate_custom(vec![jit_agg])
}
fn sum_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
self.aggregate_jit(column, array_ops::sum(), alias)
}
fn mean_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
self.aggregate_jit(column, array_ops::mean(), alias)
}
fn std_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
// Use 1 degree of freedom (n-1) for sample standard deviation
self.aggregate_jit(column, array_ops::std(1), alias)
}
fn var_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
// Use 1 degree of freedom (n-1) for sample variance
self.aggregate_jit(column, array_ops::var(1), alias)
}
fn min_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
self.aggregate_jit(column, array_ops::min(), alias)
}
fn max_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
self.aggregate_jit(column, array_ops::max(), alias)
}
fn median_jit(&self, column: &str, alias: &str) -> Result<OptimizedDataFrame> {
use super::array_ops;
self.aggregate_jit(column, array_ops::median(), alias)
}
fn aggregate_multi_jit<I, J>(
&self,
aggregations: I,
) -> Result<OptimizedDataFrame>
where
I: IntoIterator<Item = (String, J, String)>,
J: JitCompilable<Vec<f64>, f64> + 'static,
{
let custom_aggs = aggregations
.into_iter()
.map(|(col, jit_fn, alias)| {
let jit_function = jit_fn;
CustomAggregation {
column: col,
op: AggregateOp::Custom,
alias,
custom_fn: Some(Arc::new(move |values: &[f64]| -> f64 {
let vec_values = values.to_vec();
jit_function.execute(vec_values)
})),
}
})
.collect::<Vec<_>>();
self.aggregate_custom(custom_aggs)
}
// Parallel implementation of sum
fn parallel_sum_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame> {
let parallel_sum = parallel_sum_f64(config);
self.aggregate_jit(column, parallel_sum, alias)
}
// Parallel implementation of mean
fn parallel_mean_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame> {
let parallel_mean = parallel_mean_f64(config);
self.aggregate_jit(column, parallel_mean, alias)
}
// Parallel implementation of standard deviation
fn parallel_std_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame> {
let parallel_std = parallel_std_f64(config);
self.aggregate_jit(column, parallel_std, alias)
}
// Parallel implementation of min
fn parallel_min_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame> {
let parallel_min = parallel_min_f64(config);
self.aggregate_jit(column, parallel_min, alias)
}
// Parallel implementation of max
fn parallel_max_jit(
&self,
column: &str,
alias: &str,
config: Option<ParallelConfig>
) -> Result<OptimizedDataFrame> {
let parallel_max = parallel_max_f64(config);
self.aggregate_jit(column, parallel_max, alias)
}
}
// This struct is imported from the existing codebase
// It's duplicated here for documentation purposes
struct CustomAggregation {
column: String,
op: AggregateOp,
alias: String,
custom_fn: Option<Arc<dyn Fn(&[f64]) -> f64 + Send + Sync>>,
}