Skip to main content

dbx_core/storage/gpu/
aggregation.rs

1//! GPU aggregation operations - SUM, COUNT, MIN/MAX, FILTER.
2
3#[cfg(feature = "gpu")]
4use cudarc::driver::{LaunchConfig, PushKernelArg};
5
6#[cfg(feature = "gpu")]
7use super::data::GpuData;
8use super::manager::GpuManager;
9use crate::error::{DbxError, DbxResult};
10
11/// Aggregation operations impl block
12impl GpuManager {
13    /// SUM aggregation on GPU with configurable reduction strategy.
14    pub fn sum(&self, table: &str, column: &str) -> DbxResult<i64> {
15        #[cfg(not(feature = "gpu"))]
16        {
17            let _ = (table, column);
18            Err(DbxError::NotImplemented(
19                "GPU acceleration is not enabled".to_string(),
20            ))
21        }
22
23        #[cfg(feature = "gpu")]
24        {
25            tracing::debug!(target: "gpu", table = %table, column = %column, "GPU sum start");
26            let start = std::time::Instant::now();
27
28            let data = self.get_gpu_data(table, column).ok_or_else(|| {
29                DbxError::Gpu(format!(
30                    "Column {}.{} not found in GPU cache",
31                    table, column
32                ))
33            })?;
34
35            match &*data {
36                GpuData::Int32(slice) => {
37                    let n = slice.len() as i32;
38                    let stream = self.device.default_stream();
39
40                    // Choose reduction strategy
41                    let strategy = self.reduction_strategy.choose_for_sum(slice.len());
42
43                    match strategy {
44                        GpuReductionStrategy::Histogram => {
45                            // Histogram-based aggregation for low cardinality data
46                            // Step 1: Copy data to CPU to detect cardinality (find min/max)
47                            let slice_host = stream.clone_dtoh(slice).map_err(|e| {
48                                DbxError::Gpu(format!("Failed to copy slice to host: {:?}", e))
49                            })?;
50
51                            let min_val = slice_host.iter().min().copied().unwrap_or(0);
52                            let max_val = slice_host.iter().max().copied().unwrap_or(0);
53                            let num_bins = (max_val - min_val + 1).min(1000) as usize;
54
55                            // Only use histogram if cardinality is reasonable
56                            if num_bins > 1000 || num_bins == 0 {
57                                // Fall back to SinglePass for high cardinality
58                                return Err(DbxError::Gpu(
59                                    "Cardinality too high for histogram, use SinglePass"
60                                        .to_string(),
61                                ));
62                            }
63
64                            // Allocate histogram buffer
65                            let mut histogram_dev =
66                                stream.alloc_zeros::<i64>(num_bins).map_err(|e| {
67                                    DbxError::Gpu(format!("Failed to alloc histogram: {:?}", e))
68                                })?;
69
70                            // Load histogram kernel
71                            let func =
72                                self.module
73                                    .load_function("histogram_sum_i32")
74                                    .map_err(|_| {
75                                        DbxError::Gpu(
76                                            "Kernel histogram_sum_i32 not found".to_string(),
77                                        )
78                                    })?;
79
80                            // Launch histogram kernel
81                            let cfg = LaunchConfig::for_num_elems(n as u32);
82                            let shared_mem_bytes = (num_bins * std::mem::size_of::<i64>()) as u32;
83                            let cfg_with_shared = LaunchConfig {
84                                shared_mem_bytes,
85                                ..cfg
86                            };
87
88                            let num_bins_i32 = num_bins as i32;
89                            let mut builder = stream.launch_builder(&func);
90                            builder.arg(slice);
91                            builder.arg(slice); // keys = values for simple SUM
92                            builder.arg(&mut histogram_dev);
93                            builder.arg(&n);
94                            builder.arg(&num_bins_i32);
95                            unsafe { builder.launch(cfg_with_shared) }.map_err(|e| {
96                                DbxError::Gpu(format!("Histogram kernel launch failed: {:?}", e))
97                            })?;
98
99                            // Synchronize and sum histogram
100                            stream.synchronize().map_err(|e| {
101                                DbxError::Gpu(format!("Stream sync failed: {:?}", e))
102                            })?;
103
104                            let histogram_host =
105                                stream.clone_dtoh(&histogram_dev).map_err(|e| {
106                                    DbxError::Gpu(format!("Failed to copy histogram: {:?}", e))
107                                })?;
108
109                            let result = histogram_host.iter().sum();
110                            tracing::debug!(target: "gpu", table = %table, column = %column, strategy = "Histogram", elapsed_us = start.elapsed().as_micros(), "GPU sum complete");
111                            Ok(result)
112                        }
113                        GpuReductionStrategy::SinglePass => {
114                            // Single-pass with atomic operations
115                            let mut result_dev = stream.alloc_zeros::<i64>(1).map_err(|e| {
116                                DbxError::Gpu(format!("Failed to alloc result: {:?}", e))
117                            })?;
118
119                            let func = self.module.load_function("sum_i32").map_err(|_| {
120                                DbxError::Gpu("Kernel sum_i32 not found".to_string())
121                            })?;
122
123                            let cfg = LaunchConfig::for_num_elems(n as u32);
124                            let mut builder = stream.launch_builder(&func);
125                            builder.arg(slice);
126                            builder.arg(&mut result_dev);
127                            builder.arg(&n);
128                            unsafe { builder.launch(cfg) }.map_err(|e| {
129                                DbxError::Gpu(format!("Kernel launch failed: {:?}", e))
130                            })?;
131
132                            stream.synchronize().map_err(|e| {
133                                DbxError::Gpu(format!("Stream sync failed: {:?}", e))
134                            })?;
135                            let result_host = stream.clone_dtoh(&result_dev).map_err(|e| {
136                                DbxError::Gpu(format!("Failed to copy result: {:?}", e))
137                            })?;
138
139                            tracing::debug!(target: "gpu", table = %table, column = %column, strategy = "SinglePass", elapsed_us = start.elapsed().as_micros(), "GPU sum complete");
140                            Ok(result_host[0])
141                        }
142                        GpuReductionStrategy::MultiPass => {
143                            // Multi-pass reduction (eliminates atomic contention)
144                            let cfg = LaunchConfig::for_num_elems(n as u32);
145                            let num_blocks = cfg.grid_dim.0 as usize;
146
147                            // Allocate intermediate buffer for block partial sums
148                            let mut block_sums_dev =
149                                stream.alloc_zeros::<i64>(num_blocks).map_err(|e| {
150                                    DbxError::Gpu(format!("Failed to alloc block_sums: {:?}", e))
151                                })?;
152
153                            // Pass 1: Compute block partial sums
154                            let func_pass1 =
155                                self.module.load_function("sum_i32_pass1").map_err(|_| {
156                                    DbxError::Gpu("Kernel sum_i32_pass1 not found".to_string())
157                                })?;
158
159                            let mut builder = stream.launch_builder(&func_pass1);
160                            builder.arg(slice);
161                            builder.arg(&mut block_sums_dev);
162                            builder.arg(&n);
163                            unsafe { builder.launch(cfg) }.map_err(|e| {
164                                DbxError::Gpu(format!("Pass1 kernel launch failed: {:?}", e))
165                            })?;
166
167                            // Pass 2: Final reduction of block sums
168                            let mut result_dev = stream.alloc_zeros::<i64>(1).map_err(|e| {
169                                DbxError::Gpu(format!("Failed to alloc result: {:?}", e))
170                            })?;
171
172                            let func_pass2 =
173                                self.module.load_function("sum_i32_pass2").map_err(|_| {
174                                    DbxError::Gpu("Kernel sum_i32_pass2 not found".to_string())
175                                })?;
176
177                            // Use single block for pass2 (num_blocks is usually small)
178                            let cfg_pass2 = LaunchConfig {
179                                grid_dim: (1, 1, 1),
180                                block_dim: (256, 1, 1),
181                                shared_mem_bytes: 0,
182                            };
183
184                            let mut builder2 = stream.launch_builder(&func_pass2);
185                            builder2.arg(&block_sums_dev);
186                            builder2.arg(&mut result_dev);
187                            let num_blocks_i32 = num_blocks as i32;
188                            builder2.arg(&num_blocks_i32);
189                            unsafe { builder2.launch(cfg_pass2) }.map_err(|e| {
190                                DbxError::Gpu(format!("Pass2 kernel launch failed: {:?}", e))
191                            })?;
192
193                            stream.synchronize().map_err(|e| {
194                                DbxError::Gpu(format!("Stream sync failed: {:?}", e))
195                            })?;
196                            let result_host = stream.clone_dtoh(&result_dev).map_err(|e| {
197                                DbxError::Gpu(format!("Failed to copy result: {:?}", e))
198                            })?;
199
200                            Ok(result_host[0])
201                        }
202                        GpuReductionStrategy::Auto => {
203                            unreachable!("Auto should be resolved by choose_for_sum")
204                        }
205                    }
206                }
207                GpuData::PinnedInt32(_) => {
208                    return Err(DbxError::NotImplemented(
209                        "SUM for PinnedInt32 not implemented yet".to_string(),
210                    ));
211                }
212                _ => Err(DbxError::NotImplemented(
213                    "GPU SUM only supported for Int32 for now".to_string(),
214                )),
215            }
216        }
217    }
218
219    /// COUNT aggregation on GPU (single-pass for simplicity).
220    pub fn count(&self, table: &str, column: &str) -> DbxResult<u64> {
221        #[cfg(not(feature = "gpu"))]
222        {
223            let _ = (table, column);
224            Err(DbxError::NotImplemented(
225                "GPU acceleration is not enabled".to_string(),
226            ))
227        }
228
229        #[cfg(feature = "gpu")]
230        {
231            let data = self.get_gpu_data(table, column).ok_or_else(|| {
232                DbxError::Gpu(format!(
233                    "Column {}.{} not found in GPU cache",
234                    table, column
235                ))
236            })?;
237
238            let n = data.len() as i32;
239            let stream = self.device.default_stream();
240            let mut result_dev = stream
241                .alloc_zeros::<i64>(1)
242                .map_err(|e| DbxError::Gpu(format!("Failed to alloc result: {:?}", e)))?;
243
244            let func = self
245                .module
246                .load_function("count_all")
247                .map_err(|_| DbxError::Gpu("Kernel count_all not found".to_string()))?;
248
249            let cfg = LaunchConfig::for_num_elems(n as u32);
250
251            let mut builder = stream.launch_builder(&func);
252            match &*data {
253                GpuData::Int32(s) => {
254                    builder.arg(s);
255                    builder.arg(&mut result_dev);
256                    builder.arg(&n);
257                }
258                GpuData::Int64(s) => {
259                    builder.arg(s);
260                    builder.arg(&mut result_dev);
261                    builder.arg(&n);
262                }
263                GpuData::Float64(s) => {
264                    builder.arg(s);
265                    builder.arg(&mut result_dev);
266                    builder.arg(&n);
267                }
268                GpuData::Raw(s) => {
269                    builder.arg(s);
270                    builder.arg(&mut result_dev);
271                    builder.arg(&n);
272                }
273                GpuData::PinnedInt32(_) => {
274                    return Err(DbxError::NotImplemented(
275                        "COUNT for PinnedInt32 not implemented yet".to_string(),
276                    ));
277                }
278            }
279            unsafe { builder.launch(cfg) }
280                .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
281
282            stream
283                .synchronize()
284                .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
285            let result_host = stream
286                .clone_dtoh(&result_dev)
287                .map_err(|e| DbxError::Gpu(format!("Failed to copy result: {:?}", e)))?;
288
289            Ok(result_host[0] as u64)
290        }
291    }
292
293    /// MIN/MAX aggregation on GPU.
294    pub fn min_max(&self, table: &str, column: &str, find_max: bool) -> DbxResult<i32> {
295        #[cfg(not(feature = "gpu"))]
296        {
297            let _ = (table, column, find_max);
298            Err(DbxError::NotImplemented(
299                "GPU acceleration is not enabled".to_string(),
300            ))
301        }
302
303        #[cfg(feature = "gpu")]
304        {
305            let data = self.get_gpu_data(table, column).ok_or_else(|| {
306                DbxError::Gpu(format!(
307                    "Column {}.{} not found in GPU cache",
308                    table, column
309                ))
310            })?;
311
312            match &*data {
313                GpuData::Int32(slice) => {
314                    let n = slice.len() as i32;
315                    let initial_val = if find_max { i32::MIN } else { i32::MAX };
316                    let stream = self.device.default_stream();
317                    let mut result_dev = stream
318                        .clone_htod(&[initial_val])
319                        .map_err(|e| DbxError::Gpu(format!("Failed to alloc result: {:?}", e)))?;
320
321                    let kernel_name = if find_max { "max_i32" } else { "min_i32" };
322                    let func = self
323                        .module
324                        .load_function(kernel_name)
325                        .map_err(|_| DbxError::Gpu(format!("Kernel {} not found", kernel_name)))?;
326
327                    let cfg = LaunchConfig::for_num_elems(n as u32);
328                    let mut builder = stream.launch_builder(&func);
329                    builder.arg(slice);
330                    builder.arg(&mut result_dev);
331                    builder.arg(&n);
332                    unsafe { builder.launch(cfg) }
333                        .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
334
335                    stream
336                        .synchronize()
337                        .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
338                    let result_host = stream
339                        .clone_dtoh(&result_dev)
340                        .map_err(|e| DbxError::Gpu(format!("Failed to copy result: {:?}", e)))?;
341
342                    Ok(result_host[0])
343                }
344                GpuData::PinnedInt32(_) => {
345                    return Err(DbxError::NotImplemented(
346                        "MIN/MAX for PinnedInt32 not implemented yet".to_string(),
347                    ));
348                }
349                _ => Err(DbxError::NotImplemented(
350                    "GPU MIN/MAX only supported for Int32 for now".to_string(),
351                )),
352            }
353        }
354    }
355
356    /// Filter GT on GPU. Returns a bitmask (`Vec<u8>` where 1 means true).
357    pub fn filter_gt(&self, table: &str, column: &str, threshold: i32) -> DbxResult<Vec<u8>> {
358        #[cfg(not(feature = "gpu"))]
359        {
360            let _ = (table, column, threshold);
361            Err(DbxError::NotImplemented(
362                "GPU acceleration is not enabled".to_string(),
363            ))
364        }
365
366        #[cfg(feature = "gpu")]
367        {
368            let data = self.get_gpu_data(table, column).ok_or_else(|| {
369                DbxError::Gpu(format!(
370                    "Column {}.{} not found in GPU cache",
371                    table, column
372                ))
373            })?;
374
375            match &*data {
376                GpuData::Int32(slice) => {
377                    let n = slice.len() as i32;
378                    let stream = self.device.default_stream();
379                    let mut mask_dev = stream
380                        .alloc_zeros::<u8>(n as usize)
381                        .map_err(|e| DbxError::Gpu(format!("Failed to alloc mask: {:?}", e)))?;
382
383                    let func = self
384                        .module
385                        .load_function("filter_gt_i32")
386                        .map_err(|_| DbxError::Gpu("Kernel filter_gt_i32 not found".to_string()))?;
387
388                    let cfg = LaunchConfig::for_num_elems(n as u32);
389                    let mut builder = stream.launch_builder(&func);
390                    builder.arg(slice);
391                    builder.arg(&threshold);
392                    builder.arg(&mut mask_dev);
393                    builder.arg(&n);
394                    unsafe { builder.launch(cfg) }
395                        .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
396
397                    stream
398                        .synchronize()
399                        .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
400                    let mask_host = stream
401                        .clone_dtoh(&mask_dev)
402                        .map_err(|e| DbxError::Gpu(format!("Failed to copy mask: {:?}", e)))?;
403
404                    Ok(mask_host)
405                }
406                GpuData::PinnedInt32(_) => {
407                    return Err(DbxError::NotImplemented(
408                        "FILTER for PinnedInt32 not implemented yet".to_string(),
409                    ));
410                }
411                _ => Err(DbxError::NotImplemented(
412                    "GPU FILTER only supported for Int32 for now".to_string(),
413                )),
414            }
415        }
416    }
417
418    #[cfg(feature = "gpu")]
419    /// SUM aggregation across two tiers (e.g. Delta and ROS) using a single merge kernel.
420    pub fn merge_sum(
421        &self,
422        _table: &str,
423        _column: &str,
424        delta_data: &super::data::GpuData,
425        ros_data: &super::data::GpuData,
426    ) -> DbxResult<i64> {
427        match (delta_data, ros_data) {
428            (GpuData::Int32(delta_slice), GpuData::Int32(ros_slice)) => {
429                let delta_n = delta_slice.len() as i32;
430                let ros_n = ros_slice.len() as i32;
431                let stream = self.device.default_stream();
432
433                let mut result_dev = stream
434                    .alloc_zeros::<i64>(1)
435                    .map_err(|e| DbxError::Gpu(format!("Failed to alloc result: {:?}", e)))?;
436
437                let func = self
438                    .module
439                    .load_function("merge_sum_i32")
440                    .map_err(|_| DbxError::Gpu("Kernel merge_sum_i32 not found".to_string()))?;
441
442                // Configure based on the larger dataset
443                let cfg = LaunchConfig::for_num_elems(std::cmp::max(delta_n, ros_n) as u32);
444
445                let mut builder = stream.launch_builder(&func);
446                builder.arg(delta_slice);
447                builder.arg(&delta_n);
448                builder.arg(ros_slice);
449                builder.arg(&ros_n);
450                builder.arg(&mut result_dev);
451
452                unsafe { builder.launch(cfg) }
453                    .map_err(|e| DbxError::Gpu(format!("Merge kernel launch failed: {:?}", e)))?;
454
455                stream
456                    .synchronize()
457                    .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
458                let result_host = stream
459                    .clone_dtoh(&result_dev)
460                    .map_err(|e| DbxError::Gpu(format!("Failed to copy result: {:?}", e)))?;
461
462                Ok(result_host[0])
463            }
464            _ => Err(DbxError::NotImplemented(
465                "GPU Merge SUM only supported for Int32".to_string(),
466            )),
467        }
468    }
469}