1#[cfg(feature = "gpu")]
4use cudarc::driver::{LaunchConfig, PushKernelArg};
5
6#[cfg(feature = "gpu")]
7use super::data::GpuData;
8use super::manager::GpuManager;
9use crate::error::{DbxError, DbxResult};
10
11impl GpuManager {
13 pub fn sum(&self, table: &str, column: &str) -> DbxResult<i64> {
15 #[cfg(not(feature = "gpu"))]
16 {
17 let _ = (table, column);
18 Err(DbxError::NotImplemented(
19 "GPU acceleration is not enabled".to_string(),
20 ))
21 }
22
23 #[cfg(feature = "gpu")]
24 {
25 tracing::debug!(target: "gpu", table = %table, column = %column, "GPU sum start");
26 let start = std::time::Instant::now();
27
28 let data = self.get_gpu_data(table, column).ok_or_else(|| {
29 DbxError::Gpu(format!(
30 "Column {}.{} not found in GPU cache",
31 table, column
32 ))
33 })?;
34
35 match &*data {
36 GpuData::Int32(slice) => {
37 let n = slice.len() as i32;
38 let stream = self.device.default_stream();
39
40 let strategy = self.reduction_strategy.choose_for_sum(slice.len());
42
43 match strategy {
44 GpuReductionStrategy::Histogram => {
45 let slice_host = stream.clone_dtoh(slice).map_err(|e| {
48 DbxError::Gpu(format!("Failed to copy slice to host: {:?}", e))
49 })?;
50
51 let min_val = slice_host.iter().min().copied().unwrap_or(0);
52 let max_val = slice_host.iter().max().copied().unwrap_or(0);
53 let num_bins = (max_val - min_val + 1).min(1000) as usize;
54
55 if num_bins > 1000 || num_bins == 0 {
57 return Err(DbxError::Gpu(
59 "Cardinality too high for histogram, use SinglePass"
60 .to_string(),
61 ));
62 }
63
64 let mut histogram_dev =
66 stream.alloc_zeros::<i64>(num_bins).map_err(|e| {
67 DbxError::Gpu(format!("Failed to alloc histogram: {:?}", e))
68 })?;
69
70 let func =
72 self.module
73 .load_function("histogram_sum_i32")
74 .map_err(|_| {
75 DbxError::Gpu(
76 "Kernel histogram_sum_i32 not found".to_string(),
77 )
78 })?;
79
80 let cfg = LaunchConfig::for_num_elems(n as u32);
82 let shared_mem_bytes = (num_bins * std::mem::size_of::<i64>()) as u32;
83 let cfg_with_shared = LaunchConfig {
84 shared_mem_bytes,
85 ..cfg
86 };
87
88 let num_bins_i32 = num_bins as i32;
89 let mut builder = stream.launch_builder(&func);
90 builder.arg(slice);
91 builder.arg(slice); builder.arg(&mut histogram_dev);
93 builder.arg(&n);
94 builder.arg(&num_bins_i32);
95 unsafe { builder.launch(cfg_with_shared) }.map_err(|e| {
96 DbxError::Gpu(format!("Histogram kernel launch failed: {:?}", e))
97 })?;
98
99 stream.synchronize().map_err(|e| {
101 DbxError::Gpu(format!("Stream sync failed: {:?}", e))
102 })?;
103
104 let histogram_host =
105 stream.clone_dtoh(&histogram_dev).map_err(|e| {
106 DbxError::Gpu(format!("Failed to copy histogram: {:?}", e))
107 })?;
108
109 let result = histogram_host.iter().sum();
110 tracing::debug!(target: "gpu", table = %table, column = %column, strategy = "Histogram", elapsed_us = start.elapsed().as_micros(), "GPU sum complete");
111 Ok(result)
112 }
113 GpuReductionStrategy::SinglePass => {
114 let mut result_dev = stream.alloc_zeros::<i64>(1).map_err(|e| {
116 DbxError::Gpu(format!("Failed to alloc result: {:?}", e))
117 })?;
118
119 let func = self.module.load_function("sum_i32").map_err(|_| {
120 DbxError::Gpu("Kernel sum_i32 not found".to_string())
121 })?;
122
123 let cfg = LaunchConfig::for_num_elems(n as u32);
124 let mut builder = stream.launch_builder(&func);
125 builder.arg(slice);
126 builder.arg(&mut result_dev);
127 builder.arg(&n);
128 unsafe { builder.launch(cfg) }.map_err(|e| {
129 DbxError::Gpu(format!("Kernel launch failed: {:?}", e))
130 })?;
131
132 stream.synchronize().map_err(|e| {
133 DbxError::Gpu(format!("Stream sync failed: {:?}", e))
134 })?;
135 let result_host = stream.clone_dtoh(&result_dev).map_err(|e| {
136 DbxError::Gpu(format!("Failed to copy result: {:?}", e))
137 })?;
138
139 tracing::debug!(target: "gpu", table = %table, column = %column, strategy = "SinglePass", elapsed_us = start.elapsed().as_micros(), "GPU sum complete");
140 Ok(result_host[0])
141 }
142 GpuReductionStrategy::MultiPass => {
143 let cfg = LaunchConfig::for_num_elems(n as u32);
145 let num_blocks = cfg.grid_dim.0 as usize;
146
147 let mut block_sums_dev =
149 stream.alloc_zeros::<i64>(num_blocks).map_err(|e| {
150 DbxError::Gpu(format!("Failed to alloc block_sums: {:?}", e))
151 })?;
152
153 let func_pass1 =
155 self.module.load_function("sum_i32_pass1").map_err(|_| {
156 DbxError::Gpu("Kernel sum_i32_pass1 not found".to_string())
157 })?;
158
159 let mut builder = stream.launch_builder(&func_pass1);
160 builder.arg(slice);
161 builder.arg(&mut block_sums_dev);
162 builder.arg(&n);
163 unsafe { builder.launch(cfg) }.map_err(|e| {
164 DbxError::Gpu(format!("Pass1 kernel launch failed: {:?}", e))
165 })?;
166
167 let mut result_dev = stream.alloc_zeros::<i64>(1).map_err(|e| {
169 DbxError::Gpu(format!("Failed to alloc result: {:?}", e))
170 })?;
171
172 let func_pass2 =
173 self.module.load_function("sum_i32_pass2").map_err(|_| {
174 DbxError::Gpu("Kernel sum_i32_pass2 not found".to_string())
175 })?;
176
177 let cfg_pass2 = LaunchConfig {
179 grid_dim: (1, 1, 1),
180 block_dim: (256, 1, 1),
181 shared_mem_bytes: 0,
182 };
183
184 let mut builder2 = stream.launch_builder(&func_pass2);
185 builder2.arg(&block_sums_dev);
186 builder2.arg(&mut result_dev);
187 let num_blocks_i32 = num_blocks as i32;
188 builder2.arg(&num_blocks_i32);
189 unsafe { builder2.launch(cfg_pass2) }.map_err(|e| {
190 DbxError::Gpu(format!("Pass2 kernel launch failed: {:?}", e))
191 })?;
192
193 stream.synchronize().map_err(|e| {
194 DbxError::Gpu(format!("Stream sync failed: {:?}", e))
195 })?;
196 let result_host = stream.clone_dtoh(&result_dev).map_err(|e| {
197 DbxError::Gpu(format!("Failed to copy result: {:?}", e))
198 })?;
199
200 Ok(result_host[0])
201 }
202 GpuReductionStrategy::Auto => {
203 unreachable!("Auto should be resolved by choose_for_sum")
204 }
205 }
206 }
207 GpuData::PinnedInt32(_) => {
208 return Err(DbxError::NotImplemented(
209 "SUM for PinnedInt32 not implemented yet".to_string(),
210 ));
211 }
212 _ => Err(DbxError::NotImplemented(
213 "GPU SUM only supported for Int32 for now".to_string(),
214 )),
215 }
216 }
217 }
218
219 pub fn count(&self, table: &str, column: &str) -> DbxResult<u64> {
221 #[cfg(not(feature = "gpu"))]
222 {
223 let _ = (table, column);
224 Err(DbxError::NotImplemented(
225 "GPU acceleration is not enabled".to_string(),
226 ))
227 }
228
229 #[cfg(feature = "gpu")]
230 {
231 let data = self.get_gpu_data(table, column).ok_or_else(|| {
232 DbxError::Gpu(format!(
233 "Column {}.{} not found in GPU cache",
234 table, column
235 ))
236 })?;
237
238 let n = data.len() as i32;
239 let stream = self.device.default_stream();
240 let mut result_dev = stream
241 .alloc_zeros::<i64>(1)
242 .map_err(|e| DbxError::Gpu(format!("Failed to alloc result: {:?}", e)))?;
243
244 let func = self
245 .module
246 .load_function("count_all")
247 .map_err(|_| DbxError::Gpu("Kernel count_all not found".to_string()))?;
248
249 let cfg = LaunchConfig::for_num_elems(n as u32);
250
251 let mut builder = stream.launch_builder(&func);
252 match &*data {
253 GpuData::Int32(s) => {
254 builder.arg(s);
255 builder.arg(&mut result_dev);
256 builder.arg(&n);
257 }
258 GpuData::Int64(s) => {
259 builder.arg(s);
260 builder.arg(&mut result_dev);
261 builder.arg(&n);
262 }
263 GpuData::Float64(s) => {
264 builder.arg(s);
265 builder.arg(&mut result_dev);
266 builder.arg(&n);
267 }
268 GpuData::Raw(s) => {
269 builder.arg(s);
270 builder.arg(&mut result_dev);
271 builder.arg(&n);
272 }
273 GpuData::PinnedInt32(_) => {
274 return Err(DbxError::NotImplemented(
275 "COUNT for PinnedInt32 not implemented yet".to_string(),
276 ));
277 }
278 }
279 unsafe { builder.launch(cfg) }
280 .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
281
282 stream
283 .synchronize()
284 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
285 let result_host = stream
286 .clone_dtoh(&result_dev)
287 .map_err(|e| DbxError::Gpu(format!("Failed to copy result: {:?}", e)))?;
288
289 Ok(result_host[0] as u64)
290 }
291 }
292
293 pub fn min_max(&self, table: &str, column: &str, find_max: bool) -> DbxResult<i32> {
295 #[cfg(not(feature = "gpu"))]
296 {
297 let _ = (table, column, find_max);
298 Err(DbxError::NotImplemented(
299 "GPU acceleration is not enabled".to_string(),
300 ))
301 }
302
303 #[cfg(feature = "gpu")]
304 {
305 let data = self.get_gpu_data(table, column).ok_or_else(|| {
306 DbxError::Gpu(format!(
307 "Column {}.{} not found in GPU cache",
308 table, column
309 ))
310 })?;
311
312 match &*data {
313 GpuData::Int32(slice) => {
314 let n = slice.len() as i32;
315 let initial_val = if find_max { i32::MIN } else { i32::MAX };
316 let stream = self.device.default_stream();
317 let mut result_dev = stream
318 .clone_htod(&[initial_val])
319 .map_err(|e| DbxError::Gpu(format!("Failed to alloc result: {:?}", e)))?;
320
321 let kernel_name = if find_max { "max_i32" } else { "min_i32" };
322 let func = self
323 .module
324 .load_function(kernel_name)
325 .map_err(|_| DbxError::Gpu(format!("Kernel {} not found", kernel_name)))?;
326
327 let cfg = LaunchConfig::for_num_elems(n as u32);
328 let mut builder = stream.launch_builder(&func);
329 builder.arg(slice);
330 builder.arg(&mut result_dev);
331 builder.arg(&n);
332 unsafe { builder.launch(cfg) }
333 .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
334
335 stream
336 .synchronize()
337 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
338 let result_host = stream
339 .clone_dtoh(&result_dev)
340 .map_err(|e| DbxError::Gpu(format!("Failed to copy result: {:?}", e)))?;
341
342 Ok(result_host[0])
343 }
344 GpuData::PinnedInt32(_) => {
345 return Err(DbxError::NotImplemented(
346 "MIN/MAX for PinnedInt32 not implemented yet".to_string(),
347 ));
348 }
349 _ => Err(DbxError::NotImplemented(
350 "GPU MIN/MAX only supported for Int32 for now".to_string(),
351 )),
352 }
353 }
354 }
355
356 pub fn filter_gt(&self, table: &str, column: &str, threshold: i32) -> DbxResult<Vec<u8>> {
358 #[cfg(not(feature = "gpu"))]
359 {
360 let _ = (table, column, threshold);
361 Err(DbxError::NotImplemented(
362 "GPU acceleration is not enabled".to_string(),
363 ))
364 }
365
366 #[cfg(feature = "gpu")]
367 {
368 let data = self.get_gpu_data(table, column).ok_or_else(|| {
369 DbxError::Gpu(format!(
370 "Column {}.{} not found in GPU cache",
371 table, column
372 ))
373 })?;
374
375 match &*data {
376 GpuData::Int32(slice) => {
377 let n = slice.len() as i32;
378 let stream = self.device.default_stream();
379 let mut mask_dev = stream
380 .alloc_zeros::<u8>(n as usize)
381 .map_err(|e| DbxError::Gpu(format!("Failed to alloc mask: {:?}", e)))?;
382
383 let func = self
384 .module
385 .load_function("filter_gt_i32")
386 .map_err(|_| DbxError::Gpu("Kernel filter_gt_i32 not found".to_string()))?;
387
388 let cfg = LaunchConfig::for_num_elems(n as u32);
389 let mut builder = stream.launch_builder(&func);
390 builder.arg(slice);
391 builder.arg(&threshold);
392 builder.arg(&mut mask_dev);
393 builder.arg(&n);
394 unsafe { builder.launch(cfg) }
395 .map_err(|e| DbxError::Gpu(format!("Kernel launch failed: {:?}", e)))?;
396
397 stream
398 .synchronize()
399 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
400 let mask_host = stream
401 .clone_dtoh(&mask_dev)
402 .map_err(|e| DbxError::Gpu(format!("Failed to copy mask: {:?}", e)))?;
403
404 Ok(mask_host)
405 }
406 GpuData::PinnedInt32(_) => {
407 return Err(DbxError::NotImplemented(
408 "FILTER for PinnedInt32 not implemented yet".to_string(),
409 ));
410 }
411 _ => Err(DbxError::NotImplemented(
412 "GPU FILTER only supported for Int32 for now".to_string(),
413 )),
414 }
415 }
416 }
417
418 #[cfg(feature = "gpu")]
419 pub fn merge_sum(
421 &self,
422 _table: &str,
423 _column: &str,
424 delta_data: &super::data::GpuData,
425 ros_data: &super::data::GpuData,
426 ) -> DbxResult<i64> {
427 match (delta_data, ros_data) {
428 (GpuData::Int32(delta_slice), GpuData::Int32(ros_slice)) => {
429 let delta_n = delta_slice.len() as i32;
430 let ros_n = ros_slice.len() as i32;
431 let stream = self.device.default_stream();
432
433 let mut result_dev = stream
434 .alloc_zeros::<i64>(1)
435 .map_err(|e| DbxError::Gpu(format!("Failed to alloc result: {:?}", e)))?;
436
437 let func = self
438 .module
439 .load_function("merge_sum_i32")
440 .map_err(|_| DbxError::Gpu("Kernel merge_sum_i32 not found".to_string()))?;
441
442 let cfg = LaunchConfig::for_num_elems(std::cmp::max(delta_n, ros_n) as u32);
444
445 let mut builder = stream.launch_builder(&func);
446 builder.arg(delta_slice);
447 builder.arg(&delta_n);
448 builder.arg(ros_slice);
449 builder.arg(&ros_n);
450 builder.arg(&mut result_dev);
451
452 unsafe { builder.launch(cfg) }
453 .map_err(|e| DbxError::Gpu(format!("Merge kernel launch failed: {:?}", e)))?;
454
455 stream
456 .synchronize()
457 .map_err(|e| DbxError::Gpu(format!("Stream sync failed: {:?}", e)))?;
458 let result_host = stream
459 .clone_dtoh(&result_dev)
460 .map_err(|e| DbxError::Gpu(format!("Failed to copy result: {:?}", e)))?;
461
462 Ok(result_host[0])
463 }
464 _ => Err(DbxError::NotImplemented(
465 "GPU Merge SUM only supported for Int32".to_string(),
466 )),
467 }
468 }
469}