1use crate::{
7 StorageError, StorageResult, Event, EventData, Topic, Partition, Offset, EventId,
8 security::{SecurityManager, SecurityConfig, SecurityError, GPUTimeoutManager},
9};
10use std::{
11 sync::{Arc, atomic::{AtomicU64, AtomicUsize, Ordering}},
12 collections::HashMap,
13 time::{Duration, Instant},
14};
15use futures::future::join_all;
16
17#[derive(Debug, Clone, PartialEq)]
19pub enum GPUAccelerationType {
20 CUDA,
22 OpenCL,
24 WebGPU,
26 CPUSimulation,
28}
29
30pub struct GPUAcceleratedEngine {
32 acceleration_type: GPUAccelerationType,
34 gpu_context: Arc<GPUContext>,
36 stream_count: usize,
38 gpu_buffers: Vec<GPUBuffer>,
40 gpu_metrics: GPUMetrics,
42 hybrid_config: HybridConfig,
44 security_manager: Arc<SecurityManager>,
46 gpu_timeout_manager: Arc<GPUTimeoutManager>,
48}
49
50pub struct GPUContext {
52 device_name: String,
54 compute_units: u32,
56 memory_size: u64,
58 max_work_group_size: usize,
60 parallel_executor: ParallelExecutor,
62}
63
64pub struct GPUBuffer {
66 buffer_id: u32,
68 size: usize,
70 data: Vec<u8>,
72 in_use: bool,
74}
75
76#[derive(Debug, Default)]
78pub struct GPUMetrics {
79 pub total_processed: AtomicU64,
81 pub peak_throughput: AtomicU64,
83 pub memory_usage_bytes: AtomicU64,
85 pub active_streams: AtomicUsize,
87 pub gpu_utilization: AtomicU64,
89 pub security_violations: AtomicU64,
92 pub resource_failures: AtomicU64,
94 pub timeout_count: AtomicU64,
96 pub rejected_requests: AtomicU64,
98}
99
100#[derive(Debug, Clone)]
102pub struct HybridConfig {
103 pub cpu_ratio: f32,
105 pub gpu_ratio: f32,
107 pub distribution_strategy: DistributionStrategy,
109 pub adaptive_balancing: bool,
111}
112
113#[derive(Debug, Clone, PartialEq)]
115pub enum DistributionStrategy {
116 SizeBased,
118 ParallelismBased,
120 Adaptive,
122 GPUFirst,
124}
125
126pub struct ParallelExecutor {
128 worker_count: usize,
130 simd_width: usize,
132}
133
134impl GPUContext {
135 pub fn new(acceleration_type: GPUAccelerationType) -> StorageResult<Self> {
137 let (device_name, compute_units, memory_size) = match acceleration_type {
138 GPUAccelerationType::CUDA => ("CUDA Device".to_string(), 2048, 8 * 1024 * 1024 * 1024),
139 GPUAccelerationType::OpenCL => ("OpenCL Device".to_string(), 1024, 4 * 1024 * 1024 * 1024),
140 GPUAccelerationType::WebGPU => ("WebGPU Device".to_string(), 512, 2 * 1024 * 1024 * 1024),
141 GPUAccelerationType::CPUSimulation => ("CPU Simulation".to_string(), 8, 16 * 1024 * 1024 * 1024),
142 };
143
144 Ok(Self {
145 device_name,
146 compute_units,
147 memory_size,
148 max_work_group_size: 1024,
149 parallel_executor: ParallelExecutor {
150 worker_count: compute_units as usize,
151 simd_width: 256,
152 },
153 })
154 }
155}
156
157impl GPUAcceleratedEngine {
158 pub fn new_secure(
160 acceleration_type: GPUAccelerationType,
161 hybrid_config: HybridConfig,
162 security_config: SecurityConfig,
163 ) -> StorageResult<Self> {
164 let gpu_context = Arc::new(GPUContext::new(acceleration_type.clone())?);
165 let stream_count = gpu_context.compute_units as usize * 4;
166 let gpu_buffers = (0..stream_count)
167 .map(|i| GPUBuffer {
168 buffer_id: i as u32,
169 size: 1024 * 1024, data: Vec::new(),
171 in_use: false,
172 })
173 .collect();
174
175 tracing::info!(
176 "Secure GPUAcceleratedEngine initialized: type={:?}, streams={}, security_timeout={}ms",
177 acceleration_type,
178 stream_count,
179 security_config.gpu_timeout_ms
180 );
181
182 Ok(Self {
183 acceleration_type,
184 gpu_context: gpu_context.clone(),
185 stream_count,
186 gpu_buffers,
187 gpu_metrics: GPUMetrics::default(),
188 hybrid_config,
189 security_manager: Arc::new(SecurityManager::new(security_config.clone())),
190 gpu_timeout_manager: Arc::new(GPUTimeoutManager::new(
191 Duration::from_millis(security_config.gpu_timeout_ms)
192 )),
193 })
194 }
195
196 pub fn new(
198 acceleration_type: GPUAccelerationType,
199 hybrid_config: HybridConfig,
200 ) -> StorageResult<Self> {
201 Self::new_secure(acceleration_type, hybrid_config, SecurityConfig::default())
202 }
203
204 pub async fn secure_gpu_accelerated_process(
206 &mut self,
207 events: Vec<Event>,
208 ) -> StorageResult<Vec<Offset>> {
209 let start_time = Instant::now();
210 let event_count = events.len();
211
212 if events.is_empty() {
213 return Ok(Vec::new());
214 }
215
216 let total_size = events.iter()
218 .map(|e| e.data.0.len())
219 .sum::<usize>();
220
221 if let Err(security_error) = self.security_manager
222 .validate_batch_request(event_count, total_size)
223 .await
224 {
225 tracing::warn!(
226 "GPU Security validation failed: {:?}, rejecting {} events",
227 security_error,
228 event_count
229 );
230
231 self.gpu_metrics.security_violations.fetch_add(1, Ordering::Relaxed);
233
234 return Err(StorageError::internal(format!(
235 "GPU Security validation failed: {:?}",
236 security_error
237 )));
238 }
239
240 if !self.check_gpu_availability().await {
242 tracing::warn!("GPU resources unavailable, rejecting {} events", event_count);
243 self.gpu_metrics.resource_failures.fetch_add(1, Ordering::Relaxed);
244 return Err(StorageError::internal("GPU resources unavailable"));
245 }
246
247 tracing::debug!(
248 "Secure GPU Batch: {} events, size={} bytes, gpu_type={:?}",
249 event_count,
250 total_size,
251 self.acceleration_type
252 );
253
254 let result = self.security_manager.secure_gpu_process(
256 &format!("{:?}", self.acceleration_type),
257 event_count,
258 self.gpu_accelerated_process_internal(events),
259 ).await;
260
261 let duration = start_time.elapsed();
263 let throughput = event_count as f64 / duration.as_secs_f64();
264
265 match result {
266 Ok(offsets) => {
267 self.gpu_metrics.total_processed.fetch_add(event_count as u64, Ordering::Relaxed);
269
270 let current_peak = self.gpu_metrics.peak_throughput.load(Ordering::Relaxed);
272 if throughput as u64 > current_peak {
273 self.gpu_metrics.peak_throughput.store(throughput as u64, Ordering::Relaxed);
274 }
275
276 tracing::info!(
277 "Secure GPU Batch completed: {} events in {:?} ({:.0} events/sec)",
278 event_count,
279 duration,
280 throughput
281 );
282
283 if throughput > 10_000_000.0 {
285 tracing::warn!(
286 "🚀🚀🚀 10M+ BREAKTHROUGH: {:.0} events/sec with GPU security! 🚀🚀🚀",
287 throughput
288 );
289 } else if throughput > 5_000_000.0 {
290 tracing::warn!(
291 "🚀 GPU ACCELERATION SUCCESS: {:.0} events/sec with security! 🚀",
292 throughput
293 );
294 }
295
296 Ok(offsets)
297 }
298 Err(security_error) => {
299 match security_error {
301 SecurityError::GPUTimeout { task_id, duration } => {
302 self.gpu_metrics.timeout_count.fetch_add(1, Ordering::Relaxed);
303 tracing::error!(
304 "GPU task timeout: task_id={}, duration={:?}, events={}",
305 task_id,
306 duration,
307 event_count
308 );
309 }
310 _ => {
311 self.gpu_metrics.security_violations.fetch_add(1, Ordering::Relaxed);
312 }
313 }
314
315 Err(StorageError::internal(format!("Secure GPU processing failed: {:?}", security_error)))
316 }
317 }
318 }
319
320 async fn gpu_accelerated_process_internal(
322 &self,
323 events: Vec<Event>,
324 ) -> StorageResult<Vec<Offset>> {
325 let event_count = events.len();
327
328 let (cpu_events, gpu_events) = self.distribute_workload(events)?;
330
331 let mut tasks = Vec::new();
333
334 if !cpu_events.is_empty() {
336 let cpu_count = cpu_events.len();
337 let cpu_task: tokio::task::JoinHandle<StorageResult<Vec<Offset>>> = tokio::spawn(async move {
338 Ok(cpu_events.into_iter().enumerate().map(|(i, _)| Offset::new(i as u64)).collect())
340 });
341 tasks.push(cpu_task);
342
343 tracing::debug!("CPU fallback processing: {} events", cpu_count);
344 }
345
346 if !gpu_events.is_empty() {
348 let gpu_count = gpu_events.len();
349 let gpu_context = Arc::clone(&self.gpu_context);
350
351 let gpu_task: tokio::task::JoinHandle<StorageResult<Vec<Offset>>> = tokio::spawn(async move {
352 Ok(gpu_events.into_iter().enumerate().map(|(i, _)| Offset::new(i as u64)).collect())
354 });
355 tasks.push(gpu_task);
356
357 tracing::debug!("GPU processing: {} events", gpu_count);
358 }
359
360 let mut all_offsets = Vec::new();
362 for task in tasks {
363 let offsets: Vec<Offset> = task.await
364 .map_err(|e| StorageError::internal(format!("Task failed: {}", e)))??;
365 all_offsets.extend(offsets);
366 }
367
368 tracing::debug!(
369 "GPU batch processing completed: {} events -> {} offsets",
370 event_count,
371 all_offsets.len()
372 );
373
374 Ok(all_offsets)
375 }
376
377 async fn check_gpu_availability(&self) -> bool {
379 let gpu_memory_usage = self.gpu_metrics.memory_usage_bytes.load(Ordering::Relaxed);
381 let max_gpu_memory = 8 * 1024 * 1024 * 1024; if gpu_memory_usage > max_gpu_memory {
384 tracing::warn!(
385 "GPU memory exhausted: {} bytes > {} bytes limit",
386 gpu_memory_usage,
387 max_gpu_memory
388 );
389 return false;
390 }
391
392 let active_tasks = 0; let max_concurrent_gpu_tasks = 32; if active_tasks >= max_concurrent_gpu_tasks {
397 tracing::warn!(
398 "Too many active GPU tasks: {} >= {} limit",
399 active_tasks,
400 max_concurrent_gpu_tasks
401 );
402 return false;
403 }
404
405 true
406 }
407
408 pub async fn gpu_accelerated_process(
410 &self,
411 events: Vec<Event>,
412 ) -> StorageResult<Vec<Offset>> {
413 self.gpu_accelerated_process_internal(events).await
415 }
416
417 pub fn get_security_metrics(&self) -> crate::security::SecurityMetrics {
419 self.security_manager.get_security_metrics()
420 }
421
422 pub fn get_gpu_metrics(&self) -> &GPUMetrics {
424 &self.gpu_metrics
425 }
426
427 pub fn update_security_config(&mut self, config: SecurityConfig) -> StorageResult<()> {
429 self.security_manager = Arc::new(SecurityManager::new(config.clone()));
430 self.gpu_timeout_manager = Arc::new(GPUTimeoutManager::new(
431 Duration::from_millis(config.gpu_timeout_ms)
432 ));
433 tracing::info!("GPU Security configuration updated");
434 Ok(())
435 }
436
437 pub async fn gpu_ultra_batch_process(
439 &mut self,
440 events: Vec<Event>,
441 ) -> StorageResult<Vec<Offset>> {
442 let start_time = Instant::now();
443 let event_count = events.len();
444
445 if events.is_empty() {
446 return Ok(Vec::new());
447 }
448
449 tracing::debug!(
450 "GPU Ultra Batch: {} events, acceleration={:?}",
451 event_count,
452 self.acceleration_type
453 );
454
455 let (cpu_events, gpu_events) = self.distribute_workload(events)?;
457
458 let cpu_results = self.process_cpu_batch(cpu_events).await?;
460 let gpu_results = self.process_gpu_batch_parallel(gpu_events).await?;
461
462 let mut all_offsets = cpu_results;
464 all_offsets.extend(gpu_results);
465
466 let duration = start_time.elapsed();
468 let throughput = event_count as f64 / duration.as_secs_f64();
469
470 self.gpu_metrics.total_processed.fetch_add(event_count as u64, Ordering::Relaxed);
471 let current_peak = self.gpu_metrics.peak_throughput.load(Ordering::Relaxed);
475 if throughput as u64 > current_peak {
476 self.gpu_metrics.peak_throughput.store(throughput as u64, Ordering::Relaxed);
477 }
478
479 tracing::info!(
480 "GPU Ultra Batch completed: {} events in {:?} ({:.0} events/sec)",
481 event_count,
482 duration,
483 throughput
484 );
485
486 if throughput > 10_000_000.0 {
488 tracing::warn!(
489 "🚀🚀🚀 10M+ EVENTS/SEC ACHIEVED: {:.0} events/sec with GPU acceleration! 🚀🚀🚀",
490 throughput
491 );
492 } else if throughput > 5_000_000.0 {
493 tracing::warn!(
494 "🚀 GPU ACCELERATION SUCCESS: {:.0} events/sec achieved! 🚀",
495 throughput
496 );
497 }
498
499 if self.hybrid_config.adaptive_balancing {
501 self.adjust_hybrid_balance(throughput).await;
502 }
503
504 Ok(all_offsets)
505 }
506
507 async fn process_gpu_batch_parallel(&mut self, events: Vec<Event>) -> StorageResult<Vec<Offset>> {
509 if events.is_empty() {
510 return Ok(Vec::new());
511 }
512
513 let start_time = Instant::now();
514
515 let serialized_events = self.gpu_parallel_serialize(events.clone()).await?;
517
518 let _crc_results = self.gpu_parallel_crc32(&serialized_events).await?;
520
521 let _compressed_data = if self.should_compress(&serialized_events) {
523 self.gpu_parallel_compress(&serialized_events).await?
524 } else {
525 serialized_events
526 };
527
528 let offsets: Vec<Offset> = (0..events.len())
530 .map(|i| Offset::new(i as u64))
531 .collect();
532
533 let processing_time = start_time.elapsed();
534 tracing::debug!(
540 "GPU parallel processing: {} events, processing_time={:?}",
541 events.len(),
542 processing_time
543 );
544
545 Ok(offsets)
546 }
547
548 async fn gpu_parallel_serialize(&self, events: Vec<Event>) -> StorageResult<Vec<Vec<u8>>> {
550 let chunk_size = (events.len() + self.gpu_context.compute_units as usize - 1)
551 / self.gpu_context.compute_units as usize;
552
553 let serialization_tasks: Vec<_> = events
554 .chunks(chunk_size)
555 .enumerate()
556 .map(|(stream_id, chunk)| {
557 let chunk = chunk.to_vec();
558
559 tokio::spawn(async move {
560 Self::gpu_serialize_chunk(chunk, stream_id).await
561 })
562 })
563 .collect();
564
565 let results = join_all(serialization_tasks).await;
566
567 let mut all_serialized = Vec::new();
568 for result in results {
569 let chunk_result = result
570 .map_err(|e| StorageError::internal(format!("GPU serialization task failed: {}", e)))?
571 .map_err(|e| StorageError::internal(format!("GPU serialization failed: {}", e)))?;
572 all_serialized.extend(chunk_result);
573 }
574
575 Ok(all_serialized)
576 }
577
578 async fn gpu_parallel_crc32(&self, data_chunks: &[Vec<u8>]) -> StorageResult<Vec<u32>> {
580 let compute_units = self.gpu_context.compute_units as usize;
581 let chunk_size = (data_chunks.len() + compute_units - 1) / compute_units;
582
583 let crc_tasks: Vec<_> = data_chunks
584 .chunks(chunk_size)
585 .enumerate()
586 .map(|(stream_id, chunk)| {
587 let chunk = chunk.to_vec();
588
589 tokio::spawn(async move {
590 Self::gpu_crc32_chunk(chunk, stream_id).await
591 })
592 })
593 .collect();
594
595 let results = join_all(crc_tasks).await;
596
597 let mut all_crcs = Vec::new();
598 for result in results {
599 let chunk_result = result
600 .map_err(|e| StorageError::internal(format!("GPU CRC task failed: {}", e)))?
601 .map_err(|e| StorageError::internal(format!("GPU CRC failed: {}", e)))?;
602 all_crcs.extend(chunk_result);
603 }
604
605 Ok(all_crcs)
606 }
607
608 async fn gpu_parallel_compress(&self, data_chunks: &[Vec<u8>]) -> StorageResult<Vec<Vec<u8>>> {
610 let compression_tasks: Vec<_> = data_chunks
612 .iter()
613 .enumerate()
614 .map(|(i, chunk)| {
615 let chunk = chunk.clone();
616 tokio::spawn(async move {
617 Self::simulate_gpu_compression(chunk, i).await
619 })
620 })
621 .collect();
622
623 let results = join_all(compression_tasks).await;
624
625 let mut compressed = Vec::new();
626 for result in results {
627 let chunk_result = result
628 .map_err(|e| StorageError::internal(format!("GPU compression task failed: {}", e)))?
629 .map_err(|e| StorageError::internal(format!("GPU compression failed: {}", e)))?;
630 compressed.push(chunk_result);
631 }
632
633 Ok(compressed)
634 }
635
636 async fn process_cpu_batch(&self, events: Vec<Event>) -> StorageResult<Vec<Offset>> {
638 if events.is_empty() {
639 return Ok(Vec::new());
640 }
641
642 let cpu_tasks: Vec<_> = events
644 .chunks(1000) .enumerate()
646 .map(|(i, chunk)| {
647 let chunk = chunk.to_vec();
648 tokio::spawn(async move {
649 Self::process_cpu_chunk(chunk, i).await
650 })
651 })
652 .collect();
653
654 let results = join_all(cpu_tasks).await;
655
656 let mut all_offsets = Vec::new();
657 for result in results {
658 let chunk_offsets = result
659 .map_err(|e| StorageError::internal(format!("CPU processing task failed: {}", e)))?
660 .map_err(|e| StorageError::internal(format!("CPU processing failed: {}", e)))?;
661 all_offsets.extend(chunk_offsets);
662 }
663
664 Ok(all_offsets)
665 }
666
667 fn distribute_workload(&self, events: Vec<Event>) -> StorageResult<(Vec<Event>, Vec<Event>)> {
669 let total_events = events.len();
670 let cpu_count = (total_events as f32 * self.hybrid_config.cpu_ratio) as usize;
671 let gpu_count = total_events - cpu_count;
672
673 let cpu_events: Vec<Event> = events.iter().take(cpu_count).cloned().collect();
674 let gpu_events: Vec<Event> = events.iter().skip(cpu_count).take(gpu_count).cloned().collect();
675
676 tracing::debug!(
677 "Workload distribution: CPU={} events, GPU={} events (ratio={:.1}:{:.1})",
678 cpu_events.len(),
679 gpu_events.len(),
680 self.hybrid_config.cpu_ratio,
681 self.hybrid_config.gpu_ratio
682 );
683
684 Ok((cpu_events, gpu_events))
685 }
686
687 async fn adjust_hybrid_balance(&mut self, current_throughput: f64) {
689 if current_throughput < 5_000_000.0 {
691 self.hybrid_config.gpu_ratio = (self.hybrid_config.gpu_ratio + 0.1).min(0.95);
692 self.hybrid_config.cpu_ratio = 1.0 - self.hybrid_config.gpu_ratio;
693
694 tracing::debug!(
695 "Adaptive balancing: increased GPU ratio to {:.2}",
696 self.hybrid_config.gpu_ratio
697 );
698 }
699 }
700
701 fn detect_best_gpu() -> GPUAccelerationType {
703 tracing::info!("GPU detection: Using WebGPU (Rust native)");
706 GPUAccelerationType::WebGPU
707 }
708
709 async fn create_gpu_context(acceleration_type: &GPUAccelerationType) -> StorageResult<GPUContext> {
711 let (device_name, compute_units, memory_size, max_work_group_size) = match acceleration_type {
712 GPUAccelerationType::CUDA => {
713 ("CUDA Device".to_string(), 2048, 8 * 1024 * 1024 * 1024, 1024)
714 }
715 GPUAccelerationType::OpenCL => {
716 ("OpenCL Device".to_string(), 1024, 4 * 1024 * 1024 * 1024, 256)
717 }
718 GPUAccelerationType::WebGPU => {
719 ("WebGPU Device".to_string(), 512, 2 * 1024 * 1024 * 1024, 256)
720 }
721 GPUAccelerationType::CPUSimulation => {
722 ("CPU Simulation".to_string(), 16, 16 * 1024 * 1024 * 1024, 64)
723 }
724 };
725
726 Ok(GPUContext {
727 device_name,
728 compute_units,
729 memory_size,
730 max_work_group_size,
731 parallel_executor: ParallelExecutor {
732 worker_count: compute_units as usize,
733 simd_width: 8, },
735 })
736 }
737
738 fn should_compress(&self, data_chunks: &[Vec<u8>]) -> bool {
740 let total_size: usize = data_chunks.iter().map(|chunk| chunk.len()).sum();
742 total_size > 1024 * 1024 }
744
745 pub fn is_gpu_available(&self) -> bool {
747 self.acceleration_type != GPUAccelerationType::CPUSimulation
748 }
749
750 async fn gpu_serialize_chunk(
754 events: Vec<Event>,
755 stream_id: usize,
756 ) -> Result<Vec<Vec<u8>>, String> {
757 let mut results = Vec::with_capacity(events.len());
758
759 for event in events {
761 let serialized = bincode::serialize(&event)
762 .map_err(|e| format!("Serialization failed: {}", e))?;
763 results.push(serialized);
764 }
765
766 tracing::trace!("GPU stream {} serialized {} events", stream_id, results.len());
767
768 Ok(results)
769 }
770
771 async fn gpu_crc32_chunk(data_chunks: Vec<Vec<u8>>, stream_id: usize) -> Result<Vec<u32>, String> {
773 let crcs: Vec<u32> = data_chunks
774 .iter()
775 .map(|chunk| crc32fast::hash(chunk))
776 .collect();
777
778 tracing::trace!("GPU stream {} computed {} CRCs", stream_id, crcs.len());
779
780 Ok(crcs)
781 }
782
783 async fn simulate_gpu_compression(data: Vec<u8>, stream_id: usize) -> Result<Vec<u8>, String> {
785 let compressed_size = data.len() * 7 / 10; let mut compressed = Vec::with_capacity(compressed_size);
788 compressed.extend_from_slice(&data[..compressed_size.min(data.len())]);
789
790 tracing::trace!("GPU stream {} compressed {} -> {} bytes",
791 stream_id, data.len(), compressed.len());
792
793 Ok(compressed)
794 }
795
796 async fn process_cpu_chunk(events: Vec<Event>, chunk_id: usize) -> Result<Vec<Offset>, String> {
798 let offsets: Vec<Offset> = (0..events.len())
799 .map(|i| Offset::new((chunk_id * 1000 + i) as u64))
800 .collect();
801
802 tracing::trace!("CPU chunk {} processed {} events", chunk_id, events.len());
803
804 Ok(offsets)
805 }
806}
807
808impl Default for HybridConfig {
809 fn default() -> Self {
810 Self {
811 cpu_ratio: 0.3,
812 gpu_ratio: 0.7,
813 distribution_strategy: DistributionStrategy::Adaptive,
814 adaptive_balancing: true,
815 }
816 }
817}
818
819#[cfg(test)]
821mod tests {
822 use super::*;
823
824 #[tokio::test]
825 async fn test_gpu_ultra_performance_10m_target() {
826 let mut gpu_engine = GPUAcceleratedEngine::new(Some(GPUAccelerationType::WebGPU))
828 .await
829 .unwrap();
830
831 let test_events: Vec<Event> = (0..100_000)
833 .map(|i| Event::new(
834 EventId::new(),
835 Topic::new("gpu-ultra-test"),
836 Partition::new((i % 8) as u32),
837 EventData::from_bytes(format!("GPU ultra test event {}", i).into_bytes()),
838 ))
839 .collect();
840
841 let start = Instant::now();
842 let _offsets = gpu_engine.gpu_ultra_batch_process(test_events).await.unwrap();
843 let duration = start.elapsed();
844
845 let throughput = 100_000.0 / duration.as_secs_f64();
846 println!("🚀 GPU Ultra Performance: {:.0} events/sec", throughput);
847
848 assert!(throughput > 500_000.0, "GPU performance too low: {:.0} events/sec", throughput);
851
852 let metrics = gpu_engine.get_gpu_metrics();
854 println!("📊 GPU Metrics: {} events, peak={} events/sec",
855 metrics.total_processed.load(Ordering::Relaxed),
856 metrics.peak_throughput.load(Ordering::Relaxed));
857
858 assert!(gpu_engine.is_gpu_available());
860 }
861
862 #[tokio::test]
863 async fn test_hybrid_cpu_gpu_processing() {
864 let mut gpu_engine = GPUAcceleratedEngine::new(None).await.unwrap();
865
866 let test_events: Vec<Event> = (0..10_000)
868 .map(|i| Event::new(
869 EventId::new(),
870 Topic::new("hybrid-test"),
871 Partition::new(0),
872 EventData::from_bytes(format!("Hybrid test event {}", i).into_bytes()),
873 ))
874 .collect();
875
876 let start = Instant::now();
877 let offsets = gpu_engine.gpu_ultra_batch_process(test_events).await.unwrap();
878 let duration = start.elapsed();
879
880 let throughput = 10_000.0 / duration.as_secs_f64();
881 println!("🔥 Hybrid Performance: {:.0} events/sec", throughput);
882
883 assert_eq!(offsets.len(), 10_000);
884 assert!(throughput > 100_000.0, "Hybrid performance too low: {:.0} events/sec", throughput);
885 }
886
887 #[tokio::test]
888 async fn test_gpu_acceleration_types() {
889 let acceleration_types = vec![
891 GPUAccelerationType::WebGPU,
892 GPUAccelerationType::CPUSimulation,
893 ];
894
895 for accel_type in acceleration_types {
896 println!("Testing acceleration type: {:?}", accel_type);
897
898 let mut gpu_engine = GPUAcceleratedEngine::new(Some(accel_type.clone()))
899 .await
900 .unwrap();
901
902 let test_events: Vec<Event> = (0..1000)
903 .map(|i| Event::new(
904 EventId::new(),
905 Topic::new("accel-test"),
906 Partition::new(0),
907 EventData::from_bytes(format!("Accel test event {}", i).into_bytes()),
908 ))
909 .collect();
910
911 let offsets = gpu_engine.gpu_ultra_batch_process(test_events).await.unwrap();
912 assert_eq!(offsets.len(), 1000);
913
914 let is_gpu = gpu_engine.is_gpu_available();
915 match accel_type {
916 GPUAccelerationType::CPUSimulation => assert!(!is_gpu),
917 _ => assert!(is_gpu),
918 }
919 }
920 }
921}