1#[cfg(feature = "gpu")]
4use cudarc::driver::{LaunchConfig, PushKernelArg};
5
6#[cfg(feature = "gpu")]
7use super::data::GpuData;
8use super::manager::GpuManager;
9use crate::error::{DbxError, DbxResult};
10
11impl GpuManager {
13 pub fn group_by_sum(
16 &self,
17 table: &str,
18 group_column: &str,
19 value_column: &str,
20 ) -> DbxResult<Vec<(i32, i64, i32)>> {
21 #[cfg(not(feature = "gpu"))]
22 {
23 let _ = (table, group_column, value_column);
24 Err(DbxError::NotImplemented(
25 "GPU acceleration is not enabled".to_string(),
26 ))
27 }
28
29 #[cfg(feature = "gpu")]
30 {
31 let keys_data = self.get_gpu_data(table, group_column).ok_or_else(|| {
32 DbxError::Gpu(format!(
33 "Column {}.{} not found in GPU cache",
34 table, group_column
35 ))
36 })?;
37 let values_data = self.get_gpu_data(table, value_column).ok_or_else(|| {
38 DbxError::Gpu(format!(
39 "Column {}.{} not found in GPU cache",
40 table, value_column
41 ))
42 })?;
43
44 let (keys_slice, n) = match &*keys_data {
45 GpuData::Int32(slice) => (slice, slice.len()),
46 _ => {
47 return Err(DbxError::NotImplemented(
48 "GROUP BY keys must be Int32".to_string(),
49 ));
50 }
51 };
52
53 let values_slice = match &*values_data {
54 GpuData::Int64(slice) => slice,
55 _ => {
56 return Err(DbxError::NotImplemented(
57 "GROUP BY values must be Int64 for SUM".to_string(),
58 ));
59 }
60 };
61
62 let table_size = (n * 2).next_power_of_two();
64 let stream = self.device.default_stream();
65
66 let mut hash_keys = vec![-1i32; table_size];
68 let mut hash_sums = vec![0i64; table_size];
69 let mut hash_counts = vec![0i32; table_size];
70
71 let mut hash_keys_dev = stream
72 .clone_htod(&hash_keys)
73 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash keys: {:?}", e)))?;
74 let mut hash_sums_dev = stream
75 .clone_htod(&hash_sums)
76 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash sums: {:?}", e)))?;
77 let mut hash_counts_dev = stream
78 .clone_htod(&hash_counts)
79 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash counts: {:?}", e)))?;
80
81 let kernel_name = match self.hash_strategy {
83 crate::storage::gpu::GpuHashStrategy::Linear => "group_by_sum_i32",
84 crate::storage::gpu::GpuHashStrategy::Cuckoo => "group_by_sum_cuckoo_i32",
85 crate::storage::gpu::GpuHashStrategy::RobinHood => "group_by_sum_robin_hood_i32",
86 };
87
88 let func = self
89 .module
90 .load_function(kernel_name)
91 .map_err(|_| DbxError::Gpu(format!("Kernel {} not found", kernel_name)))?;
92
93 let cfg = LaunchConfig::for_num_elems(n as u32);
94 let n_i32 = n as i32;
95 let table_size_i32 = table_size as i32;
96
97 let mut builder = stream.launch_builder(&func);
98 builder.arg(keys_slice);
99 builder.arg(values_slice);
100 builder.arg(&mut hash_keys_dev);
101 builder.arg(&mut hash_sums_dev);
102 builder.arg(&mut hash_counts_dev);
103 builder.arg(&n_i32);
104 builder.arg(&table_size_i32);
105 unsafe { builder.launch(cfg) }
106 .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
107
108 stream
109 .synchronize()
110 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
111
112 hash_keys = stream
114 .clone_dtoh(&hash_keys_dev)
115 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash keys: {:?}", e)))?;
116 hash_sums = stream
117 .clone_dtoh(&hash_sums_dev)
118 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash sums: {:?}", e)))?;
119 hash_counts = stream
120 .clone_dtoh(&hash_counts_dev)
121 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash counts: {:?}", e)))?;
122
123 let mut results = Vec::new();
125 for i in 0..table_size {
126 if hash_keys[i] != -1 {
127 results.push((hash_keys[i], hash_sums[i], hash_counts[i]));
128 }
129 }
130
131 Ok(results)
132 }
133 }
134
135 pub fn group_by_count(&self, table: &str, group_column: &str) -> DbxResult<Vec<(i32, i32)>> {
138 #[cfg(not(feature = "gpu"))]
139 {
140 let _ = (table, group_column);
141 Err(DbxError::NotImplemented(
142 "GPU acceleration is not enabled".to_string(),
143 ))
144 }
145
146 #[cfg(feature = "gpu")]
147 {
148 let keys_data = self.get_gpu_data(table, group_column).ok_or_else(|| {
149 DbxError::Gpu(format!(
150 "Column {}.{} not found in GPU cache",
151 table, group_column
152 ))
153 })?;
154
155 let (keys_slice, n) = match &*keys_data {
156 GpuData::Int32(slice) => (slice, slice.len()),
157 _ => {
158 return Err(DbxError::NotImplemented(
159 "GROUP BY keys must be Int32".to_string(),
160 ));
161 }
162 };
163
164 let table_size = (n * 2).next_power_of_two();
165 let stream = self.device.default_stream();
166
167 let mut hash_keys = vec![-1i32; table_size];
168 let mut hash_counts = vec![0i32; table_size];
169
170 let mut hash_keys_dev = stream
171 .clone_htod(&hash_keys)
172 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash keys: {:?}", e)))?;
173 let mut hash_counts_dev = stream
174 .clone_htod(&hash_counts)
175 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash counts: {:?}", e)))?;
176
177 let func = self
178 .module
179 .load_function("group_by_count_i32")
180 .map_err(|_| DbxError::Gpu("Kernel group_by_count_i32 not found".to_string()))?;
181
182 let cfg = LaunchConfig::for_num_elems(n as u32);
183 let n_i32 = n as i32;
184 let table_size_i32 = table_size as i32;
185
186 let mut builder = stream.launch_builder(&func);
187 builder.arg(keys_slice);
188 builder.arg(&mut hash_keys_dev);
189 builder.arg(&mut hash_counts_dev);
190 builder.arg(&n_i32);
191 builder.arg(&table_size_i32);
192 unsafe { builder.launch(cfg) }
193 .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
194
195 stream
196 .synchronize()
197 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
198
199 hash_keys = stream
200 .clone_dtoh(&hash_keys_dev)
201 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash keys: {:?}", e)))?;
202 hash_counts = stream
203 .clone_dtoh(&hash_counts_dev)
204 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash counts: {:?}", e)))?;
205
206 let mut results = Vec::new();
207 for i in 0..table_size {
208 if hash_keys[i] != -1 {
209 results.push((hash_keys[i], hash_counts[i]));
210 }
211 }
212
213 Ok(results)
214 }
215 }
216
217 pub fn group_by_min_max(
220 &self,
221 table: &str,
222 group_column: &str,
223 value_column: &str,
224 find_max: bool,
225 ) -> DbxResult<Vec<(i32, i32, i32)>> {
226 #[cfg(not(feature = "gpu"))]
227 {
228 let _ = (table, group_column, value_column, find_max);
229 Err(DbxError::NotImplemented(
230 "GPU acceleration is not enabled".to_string(),
231 ))
232 }
233
234 #[cfg(feature = "gpu")]
235 {
236 let keys_data = self.get_gpu_data(table, group_column).ok_or_else(|| {
237 DbxError::Gpu(format!(
238 "Column {}.{} not found in GPU cache",
239 table, group_column
240 ))
241 })?;
242 let values_data = self.get_gpu_data(table, value_column).ok_or_else(|| {
243 DbxError::Gpu(format!(
244 "Column {}.{} not found in GPU cache",
245 table, value_column
246 ))
247 })?;
248
249 let (keys_slice, n) = match &*keys_data {
250 GpuData::Int32(slice) => (slice, slice.len()),
251 _ => {
252 return Err(DbxError::NotImplemented(
253 "GROUP BY keys must be Int32".to_string(),
254 ));
255 }
256 };
257
258 let values_slice = match &*values_data {
259 GpuData::Int32(slice) => slice,
260 _ => {
261 return Err(DbxError::NotImplemented(
262 "GROUP BY values must be Int32 for MIN/MAX".to_string(),
263 ));
264 }
265 };
266
267 let table_size = (n * 2).next_power_of_two();
268 let stream = self.device.default_stream();
269
270 let initial_val = if find_max { i32::MIN } else { i32::MAX };
271 let mut hash_keys = vec![-1i32; table_size];
272 let mut hash_values = vec![initial_val; table_size];
273 let mut hash_counts = vec![0i32; table_size];
274
275 let mut hash_keys_dev = stream
276 .clone_htod(&hash_keys)
277 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash keys: {:?}", e)))?;
278 let mut hash_values_dev = stream
279 .clone_htod(&hash_values)
280 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash values: {:?}", e)))?;
281 let mut hash_counts_dev = stream
282 .clone_htod(&hash_counts)
283 .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash counts: {:?}", e)))?;
284
285 let kernel_name = if find_max {
286 "group_by_max_i32"
287 } else {
288 "group_by_min_i32"
289 };
290 let func = self
291 .module
292 .load_function(kernel_name)
293 .map_err(|_| DbxError::Gpu(format!("Kernel {} not found", kernel_name)))?;
294
295 let cfg = LaunchConfig::for_num_elems(n as u32);
296 let n_i32 = n as i32;
297 let table_size_i32 = table_size as i32;
298
299 let mut builder = stream.launch_builder(&func);
300 builder.arg(keys_slice);
301 builder.arg(values_slice);
302 builder.arg(&mut hash_keys_dev);
303 builder.arg(&mut hash_values_dev);
304 builder.arg(&mut hash_counts_dev);
305 builder.arg(&n_i32);
306 builder.arg(&table_size_i32);
307 unsafe { builder.launch(cfg) }
308 .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
309
310 stream
311 .synchronize()
312 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
313
314 hash_keys = stream
315 .clone_dtoh(&hash_keys_dev)
316 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash keys: {:?}", e)))?;
317 hash_values = stream
318 .clone_dtoh(&hash_values_dev)
319 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash values: {:?}", e)))?;
320 hash_counts = stream
321 .clone_dtoh(&hash_counts_dev)
322 .map_err(|e| DbxError::Gpu(format!("Failed to copy hash counts: {:?}", e)))?;
323
324 let mut results = Vec::new();
325 for i in 0..table_size {
326 if hash_keys[i] != -1 {
327 results.push((hash_keys[i], hash_values[i], hash_counts[i]));
328 }
329 }
330
331 Ok(results)
332 }
333 }
334}