Skip to main content

dbx_core/storage/gpu/
group_by.rs

1//! GPU GROUP BY operations - SUM, COUNT, MIN/MAX.
2
3#[cfg(feature = "gpu")]
4use cudarc::driver::{LaunchConfig, PushKernelArg};
5
6#[cfg(feature = "gpu")]
7use super::data::GpuData;
8use super::manager::GpuManager;
9use crate::error::{DbxError, DbxResult};
10
11/// GROUP BY operations impl block
12impl GpuManager {
13    /// GROUP BY with SUM aggregation on GPU.
14    /// Returns Vec<(group_key, sum_value, count)>
15    pub fn group_by_sum(
16        &self,
17        table: &str,
18        group_column: &str,
19        value_column: &str,
20    ) -> DbxResult<Vec<(i32, i64, i32)>> {
21        #[cfg(not(feature = "gpu"))]
22        {
23            let _ = (table, group_column, value_column);
24            Err(DbxError::NotImplemented(
25                "GPU acceleration is not enabled".to_string(),
26            ))
27        }
28
29        #[cfg(feature = "gpu")]
30        {
31            let keys_data = self.get_gpu_data(table, group_column).ok_or_else(|| {
32                DbxError::Gpu(format!(
33                    "Column {}.{} not found in GPU cache",
34                    table, group_column
35                ))
36            })?;
37            let values_data = self.get_gpu_data(table, value_column).ok_or_else(|| {
38                DbxError::Gpu(format!(
39                    "Column {}.{} not found in GPU cache",
40                    table, value_column
41                ))
42            })?;
43
44            let (keys_slice, n) = match &*keys_data {
45                GpuData::Int32(slice) => (slice, slice.len()),
46                _ => {
47                    return Err(DbxError::NotImplemented(
48                        "GROUP BY keys must be Int32".to_string(),
49                    ));
50                }
51            };
52
53            let values_slice = match &*values_data {
54                GpuData::Int64(slice) => slice,
55                _ => {
56                    return Err(DbxError::NotImplemented(
57                        "GROUP BY values must be Int64 for SUM".to_string(),
58                    ));
59                }
60            };
61
62            // Hash table size: ~2x input size for good performance
63            let table_size = (n * 2).next_power_of_two();
64            let stream = self.device.default_stream();
65
66            // Allocate hash table (initialized to -1 for keys, 0 for sums/counts)
67            let mut hash_keys = vec![-1i32; table_size];
68            let mut hash_sums = vec![0i64; table_size];
69            let mut hash_counts = vec![0i32; table_size];
70
71            let mut hash_keys_dev = stream
72                .clone_htod(&hash_keys)
73                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash keys: {:?}", e)))?;
74            let mut hash_sums_dev = stream
75                .clone_htod(&hash_sums)
76                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash sums: {:?}", e)))?;
77            let mut hash_counts_dev = stream
78                .clone_htod(&hash_counts)
79                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash counts: {:?}", e)))?;
80
81            // Launch kernel based on selected strategy
82            let kernel_name = match self.hash_strategy {
83                crate::storage::gpu::GpuHashStrategy::Linear => "group_by_sum_i32",
84                crate::storage::gpu::GpuHashStrategy::Cuckoo => "group_by_sum_cuckoo_i32",
85                crate::storage::gpu::GpuHashStrategy::RobinHood => "group_by_sum_robin_hood_i32",
86            };
87
88            let func = self
89                .module
90                .load_function(kernel_name)
91                .map_err(|_| DbxError::Gpu(format!("Kernel {} not found", kernel_name)))?;
92
93            let cfg = LaunchConfig::for_num_elems(n as u32);
94            let n_i32 = n as i32;
95            let table_size_i32 = table_size as i32;
96
97            let mut builder = stream.launch_builder(&func);
98            builder.arg(keys_slice);
99            builder.arg(values_slice);
100            builder.arg(&mut hash_keys_dev);
101            builder.arg(&mut hash_sums_dev);
102            builder.arg(&mut hash_counts_dev);
103            builder.arg(&n_i32);
104            builder.arg(&table_size_i32);
105            unsafe { builder.launch(cfg) }
106                .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
107
108            stream
109                .synchronize()
110                .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
111
112            // Copy results back
113            hash_keys = stream
114                .clone_dtoh(&hash_keys_dev)
115                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash keys: {:?}", e)))?;
116            hash_sums = stream
117                .clone_dtoh(&hash_sums_dev)
118                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash sums: {:?}", e)))?;
119            hash_counts = stream
120                .clone_dtoh(&hash_counts_dev)
121                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash counts: {:?}", e)))?;
122
123            // Extract non-empty groups
124            let mut results = Vec::new();
125            for i in 0..table_size {
126                if hash_keys[i] != -1 {
127                    results.push((hash_keys[i], hash_sums[i], hash_counts[i]));
128                }
129            }
130
131            Ok(results)
132        }
133    }
134
135    /// GROUP BY with COUNT aggregation on GPU.
136    /// Returns Vec<(group_key, count)>
137    pub fn group_by_count(&self, table: &str, group_column: &str) -> DbxResult<Vec<(i32, i32)>> {
138        #[cfg(not(feature = "gpu"))]
139        {
140            let _ = (table, group_column);
141            Err(DbxError::NotImplemented(
142                "GPU acceleration is not enabled".to_string(),
143            ))
144        }
145
146        #[cfg(feature = "gpu")]
147        {
148            let keys_data = self.get_gpu_data(table, group_column).ok_or_else(|| {
149                DbxError::Gpu(format!(
150                    "Column {}.{} not found in GPU cache",
151                    table, group_column
152                ))
153            })?;
154
155            let (keys_slice, n) = match &*keys_data {
156                GpuData::Int32(slice) => (slice, slice.len()),
157                _ => {
158                    return Err(DbxError::NotImplemented(
159                        "GROUP BY keys must be Int32".to_string(),
160                    ));
161                }
162            };
163
164            let table_size = (n * 2).next_power_of_two();
165            let stream = self.device.default_stream();
166
167            let mut hash_keys = vec![-1i32; table_size];
168            let mut hash_counts = vec![0i32; table_size];
169
170            let mut hash_keys_dev = stream
171                .clone_htod(&hash_keys)
172                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash keys: {:?}", e)))?;
173            let mut hash_counts_dev = stream
174                .clone_htod(&hash_counts)
175                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash counts: {:?}", e)))?;
176
177            let func = self
178                .module
179                .load_function("group_by_count_i32")
180                .map_err(|_| DbxError::Gpu("Kernel group_by_count_i32 not found".to_string()))?;
181
182            let cfg = LaunchConfig::for_num_elems(n as u32);
183            let n_i32 = n as i32;
184            let table_size_i32 = table_size as i32;
185
186            let mut builder = stream.launch_builder(&func);
187            builder.arg(keys_slice);
188            builder.arg(&mut hash_keys_dev);
189            builder.arg(&mut hash_counts_dev);
190            builder.arg(&n_i32);
191            builder.arg(&table_size_i32);
192            unsafe { builder.launch(cfg) }
193                .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
194
195            stream
196                .synchronize()
197                .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
198
199            hash_keys = stream
200                .clone_dtoh(&hash_keys_dev)
201                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash keys: {:?}", e)))?;
202            hash_counts = stream
203                .clone_dtoh(&hash_counts_dev)
204                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash counts: {:?}", e)))?;
205
206            let mut results = Vec::new();
207            for i in 0..table_size {
208                if hash_keys[i] != -1 {
209                    results.push((hash_keys[i], hash_counts[i]));
210                }
211            }
212
213            Ok(results)
214        }
215    }
216
217    /// GROUP BY with MIN/MAX aggregation on GPU.
218    /// Returns Vec<(group_key, min_or_max_value, count)>
219    pub fn group_by_min_max(
220        &self,
221        table: &str,
222        group_column: &str,
223        value_column: &str,
224        find_max: bool,
225    ) -> DbxResult<Vec<(i32, i32, i32)>> {
226        #[cfg(not(feature = "gpu"))]
227        {
228            let _ = (table, group_column, value_column, find_max);
229            Err(DbxError::NotImplemented(
230                "GPU acceleration is not enabled".to_string(),
231            ))
232        }
233
234        #[cfg(feature = "gpu")]
235        {
236            let keys_data = self.get_gpu_data(table, group_column).ok_or_else(|| {
237                DbxError::Gpu(format!(
238                    "Column {}.{} not found in GPU cache",
239                    table, group_column
240                ))
241            })?;
242            let values_data = self.get_gpu_data(table, value_column).ok_or_else(|| {
243                DbxError::Gpu(format!(
244                    "Column {}.{} not found in GPU cache",
245                    table, value_column
246                ))
247            })?;
248
249            let (keys_slice, n) = match &*keys_data {
250                GpuData::Int32(slice) => (slice, slice.len()),
251                _ => {
252                    return Err(DbxError::NotImplemented(
253                        "GROUP BY keys must be Int32".to_string(),
254                    ));
255                }
256            };
257
258            let values_slice = match &*values_data {
259                GpuData::Int32(slice) => slice,
260                _ => {
261                    return Err(DbxError::NotImplemented(
262                        "GROUP BY values must be Int32 for MIN/MAX".to_string(),
263                    ));
264                }
265            };
266
267            let table_size = (n * 2).next_power_of_two();
268            let stream = self.device.default_stream();
269
270            let initial_val = if find_max { i32::MIN } else { i32::MAX };
271            let mut hash_keys = vec![-1i32; table_size];
272            let mut hash_values = vec![initial_val; table_size];
273            let mut hash_counts = vec![0i32; table_size];
274
275            let mut hash_keys_dev = stream
276                .clone_htod(&hash_keys)
277                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash keys: {:?}", e)))?;
278            let mut hash_values_dev = stream
279                .clone_htod(&hash_values)
280                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash values: {:?}", e)))?;
281            let mut hash_counts_dev = stream
282                .clone_htod(&hash_counts)
283                .map_err(|e| DbxError::Gpu(format!("Failed to alloc hash counts: {:?}", e)))?;
284
285            let kernel_name = if find_max {
286                "group_by_max_i32"
287            } else {
288                "group_by_min_i32"
289            };
290            let func = self
291                .module
292                .load_function(kernel_name)
293                .map_err(|_| DbxError::Gpu(format!("Kernel {} not found", kernel_name)))?;
294
295            let cfg = LaunchConfig::for_num_elems(n as u32);
296            let n_i32 = n as i32;
297            let table_size_i32 = table_size as i32;
298
299            let mut builder = stream.launch_builder(&func);
300            builder.arg(keys_slice);
301            builder.arg(values_slice);
302            builder.arg(&mut hash_keys_dev);
303            builder.arg(&mut hash_values_dev);
304            builder.arg(&mut hash_counts_dev);
305            builder.arg(&n_i32);
306            builder.arg(&table_size_i32);
307            unsafe { builder.launch(cfg) }
308                .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
309
310            stream
311                .synchronize()
312                .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
313
314            hash_keys = stream
315                .clone_dtoh(&hash_keys_dev)
316                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash keys: {:?}", e)))?;
317            hash_values = stream
318                .clone_dtoh(&hash_values_dev)
319                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash values: {:?}", e)))?;
320            hash_counts = stream
321                .clone_dtoh(&hash_counts_dev)
322                .map_err(|e| DbxError::Gpu(format!("Failed to copy hash counts: {:?}", e)))?;
323
324            let mut results = Vec::new();
325            for i in 0..table_size {
326                if hash_keys[i] != -1 {
327                    results.push((hash_keys[i], hash_values[i], hash_counts[i]));
328                }
329            }
330
331            Ok(results)
332        }
333    }
334}