ringkernel_procint/cuda/
executor.rs

1//! CUDA kernel execution manager.
2//!
3//! Manages GPU kernel compilation and execution with CPU fallback.
4//! Uses ringkernel-cuda patterns for proper GPU integration.
5
6use super::{KernelSource, KernelType};
7
8#[allow(unused_imports)]
9use super::LaunchConfig;
10
11#[cfg(feature = "cuda")]
12use cudarc::driver::{CudaDevice, CudaSlice, LaunchAsync, LaunchConfig as CudaLaunchConfig};
13#[cfg(feature = "cuda")]
14use std::sync::Arc;
15
16/// Kernel execution result.
17#[derive(Debug, Clone)]
18pub struct ExecutionResult {
19    /// Execution time in microseconds.
20    pub execution_time_us: u64,
21    /// Number of elements processed.
22    pub elements_processed: u64,
23    /// Throughput in elements per second.
24    pub throughput: f64,
25    /// Whether GPU was used.
26    pub used_gpu: bool,
27}
28
29impl Default for ExecutionResult {
30    fn default() -> Self {
31        Self {
32            execution_time_us: 0,
33            elements_processed: 0,
34            throughput: 0.0,
35            used_gpu: false,
36        }
37    }
38}
39
40impl ExecutionResult {
41    /// Create a new execution result.
42    pub fn new(execution_time_us: u64, elements_processed: u64, used_gpu: bool) -> Self {
43        let throughput = if execution_time_us > 0 {
44            elements_processed as f64 * 1_000_000.0 / execution_time_us as f64
45        } else {
46            0.0
47        };
48        Self {
49            execution_time_us,
50            elements_processed,
51            throughput,
52            used_gpu,
53        }
54    }
55}
56
57/// Compiled kernel handle.
58#[derive(Debug)]
59pub struct CompiledKernel {
60    /// Kernel name.
61    pub name: String,
62    /// Entry point.
63    pub entry_point: String,
64    /// Is compiled.
65    pub is_compiled: bool,
66    /// Kernel type.
67    pub kernel_type: KernelType,
68    /// CUDA source code.
69    pub source: String,
70}
71
72/// GPU execution backend status.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum GpuStatus {
75    /// No GPU available, using CPU fallback.
76    CpuFallback,
77    /// CUDA GPU available and initialized.
78    CudaReady,
79    /// GPU available but not initialized.
80    CudaPending,
81    /// GPU initialization failed.
82    CudaError,
83    /// CUDA feature not compiled in.
84    CudaNotCompiled,
85}
86
87impl GpuStatus {
88    /// Human-readable status string.
89    pub fn as_str(&self) -> &'static str {
90        match self {
91            GpuStatus::CpuFallback => "CPU",
92            GpuStatus::CudaReady => "CUDA",
93            GpuStatus::CudaPending => "CUDA (init)",
94            GpuStatus::CudaError => "CUDA (err)",
95            GpuStatus::CudaNotCompiled => "CPU (no CUDA)",
96        }
97    }
98
99    /// Check if CUDA feature is compiled.
100    pub fn is_cuda_compiled() -> bool {
101        cfg!(feature = "cuda")
102    }
103}
104
105/// GPU usage statistics.
106#[derive(Debug, Clone, Default)]
107pub struct GpuStats {
108    /// Total kernel launches.
109    pub kernel_launches: u64,
110    /// Total GPU execution time in microseconds.
111    pub total_gpu_time_us: u64,
112    /// Total elements processed on GPU.
113    pub total_elements_gpu: u64,
114    /// Total bytes transferred to GPU.
115    pub bytes_to_gpu: u64,
116    /// Total bytes transferred from GPU.
117    pub bytes_from_gpu: u64,
118}
119
120impl GpuStats {
121    /// Record a kernel execution.
122    pub fn record(&mut self, result: &ExecutionResult, bytes_in: u64, bytes_out: u64) {
123        if result.used_gpu {
124            self.kernel_launches += 1;
125            self.total_gpu_time_us += result.execution_time_us;
126            self.total_elements_gpu += result.elements_processed;
127            self.bytes_to_gpu += bytes_in;
128            self.bytes_from_gpu += bytes_out;
129        }
130    }
131
132    /// Get average kernel time in microseconds.
133    pub fn avg_kernel_time_us(&self) -> f64 {
134        if self.kernel_launches > 0 {
135            self.total_gpu_time_us as f64 / self.kernel_launches as f64
136        } else {
137            0.0
138        }
139    }
140
141    /// Get throughput in elements per second.
142    pub fn throughput(&self) -> f64 {
143        if self.total_gpu_time_us > 0 {
144            self.total_elements_gpu as f64 * 1_000_000.0 / self.total_gpu_time_us as f64
145        } else {
146            0.0
147        }
148    }
149}
150
151/// Kernel executor for GPU operations.
152pub struct KernelExecutor {
153    /// GPU status.
154    gpu_status: GpuStatus,
155    /// Compiled kernel cache.
156    kernel_cache: std::collections::HashMap<String, CompiledKernel>,
157    /// GPU usage statistics.
158    pub stats: GpuStats,
159    /// CUDA device handle.
160    #[cfg(feature = "cuda")]
161    device: Option<Arc<CudaDevice>>,
162}
163
164impl std::fmt::Debug for KernelExecutor {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        f.debug_struct("KernelExecutor")
167            .field("gpu_status", &self.gpu_status)
168            .field("kernel_count", &self.kernel_cache.len())
169            .field("stats", &self.stats)
170            .finish()
171    }
172}
173
174impl Default for KernelExecutor {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl KernelExecutor {
181    /// Create a new kernel executor.
182    pub fn new() -> Self {
183        #[cfg(feature = "cuda")]
184        {
185            match CudaDevice::new(0) {
186                Ok(device) => {
187                    log::info!("CUDA device initialized successfully");
188                    Self {
189                        gpu_status: GpuStatus::CudaReady,
190                        kernel_cache: std::collections::HashMap::new(),
191                        stats: GpuStats::default(),
192                        device: Some(device),
193                    }
194                }
195                Err(e) => {
196                    log::warn!("CUDA device initialization failed: {}", e);
197                    Self {
198                        gpu_status: GpuStatus::CudaError,
199                        kernel_cache: std::collections::HashMap::new(),
200                        stats: GpuStats::default(),
201                        device: None,
202                    }
203                }
204            }
205        }
206
207        #[cfg(not(feature = "cuda"))]
208        {
209            Self {
210                gpu_status: GpuStatus::CudaNotCompiled,
211                kernel_cache: std::collections::HashMap::new(),
212                stats: GpuStats::default(),
213            }
214        }
215    }
216
217    /// Get GPU status.
218    pub fn gpu_status(&self) -> GpuStatus {
219        self.gpu_status
220    }
221
222    /// Check if CUDA is available.
223    pub fn is_cuda_available(&self) -> bool {
224        self.gpu_status == GpuStatus::CudaReady
225    }
226
227    /// Get CUDA device reference.
228    #[cfg(feature = "cuda")]
229    pub fn device(&self) -> Option<&Arc<CudaDevice>> {
230        self.device.as_ref()
231    }
232
233    /// Compile a kernel from source using NVRTC.
234    pub fn compile(&mut self, source: &KernelSource) -> Result<&CompiledKernel, String> {
235        // Clone the name first to avoid lifetime issues
236        let kernel_name = source.name.clone();
237        let entry_point = source.entry_point.clone();
238        let cuda_source = source.source.clone();
239
240        if self.kernel_cache.contains_key(&kernel_name) {
241            return Ok(self.kernel_cache.get(&kernel_name).unwrap());
242        }
243
244        #[cfg(feature = "cuda")]
245        if let Some(device) = &self.device {
246            // Compile CUDA C to PTX using NVRTC
247            let ptx = cudarc::nvrtc::compile_ptx(&cuda_source)
248                .map_err(|e| format!("NVRTC compilation failed for {}: {}", kernel_name, e))?;
249
250            // Load the PTX module into the device
251            // load_ptx requires 'static strings, so we use leaked Box<str>
252            let module_name: &'static str = Box::leak(kernel_name.clone().into_boxed_str());
253            let func_name: &'static str = Box::leak(entry_point.clone().into_boxed_str());
254
255            device
256                .load_ptx(ptx, module_name, &[func_name])
257                .map_err(|e| {
258                    format!(
259                        "Failed to load PTX for {} (func: {}): {}",
260                        kernel_name, func_name, e
261                    )
262                })?;
263
264            log::info!(
265                "Compiled and loaded CUDA kernel: {} (entry: {})",
266                kernel_name,
267                entry_point
268            );
269        }
270
271        let compiled = CompiledKernel {
272            name: kernel_name.clone(),
273            entry_point,
274            is_compiled: true,
275            kernel_type: source.kernel_type,
276            source: cuda_source,
277        };
278
279        self.kernel_cache.insert(kernel_name.clone(), compiled);
280        Ok(self.kernel_cache.get(&kernel_name).unwrap())
281    }
282
283    /// Get compiled kernel count.
284    pub fn kernel_count(&self) -> usize {
285        self.kernel_cache.len()
286    }
287
288    /// Execute DFG construction kernel on GPU.
289    #[cfg(feature = "cuda")]
290    pub fn execute_dfg_gpu(
291        &mut self,
292        events: &[crate::models::GpuObjectEvent],
293    ) -> Result<(Vec<crate::models::GpuDFGEdge>, ExecutionResult), String> {
294        use crate::models::GpuDFGEdge;
295
296        let device = self.device.as_ref().ok_or("No CUDA device available")?;
297        let start = std::time::Instant::now();
298
299        let n = events.len();
300        if n < 2 {
301            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
302        }
303
304        // Extract activity pairs from consecutive events per case
305        let mut source_activities: Vec<u32> = Vec::new();
306        let mut target_activities: Vec<u32> = Vec::new();
307        let mut durations: Vec<u32> = Vec::new();
308
309        // Group by case and extract transitions
310        let mut case_events: std::collections::HashMap<u64, Vec<&crate::models::GpuObjectEvent>> =
311            std::collections::HashMap::new();
312        for event in events {
313            case_events.entry(event.object_id).or_default().push(event);
314        }
315
316        for case_evts in case_events.values() {
317            let mut sorted: Vec<_> = case_evts.iter().collect();
318            sorted.sort_by_key(|e| e.timestamp.physical_ms);
319
320            for window in sorted.windows(2) {
321                source_activities.push(window[0].activity_id);
322                target_activities.push(window[1].activity_id);
323                durations.push(window[0].duration_ms);
324            }
325        }
326
327        let pair_count = source_activities.len();
328        if pair_count == 0 {
329            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
330        }
331
332        // Max activities for edge matrix (32x32)
333        let max_activities = 32usize;
334        let edge_count = max_activities * max_activities;
335
336        // Host-to-Device transfers
337        let d_sources = device
338            .htod_sync_copy(&source_activities)
339            .map_err(|e| format!("HtoD sources failed: {}", e))?;
340        let d_targets = device
341            .htod_sync_copy(&target_activities)
342            .map_err(|e| format!("HtoD targets failed: {}", e))?;
343        let d_durations = device
344            .htod_sync_copy(&durations)
345            .map_err(|e| format!("HtoD durations failed: {}", e))?;
346
347        // Allocate output buffers (initialized to zeros)
348        let edge_frequencies = vec![0u32; edge_count];
349        let edge_durations = vec![0u64; edge_count];
350
351        let d_edge_freq = device
352            .htod_sync_copy(&edge_frequencies)
353            .map_err(|e| format!("HtoD edge_freq failed: {}", e))?;
354        let d_edge_dur = device
355            .htod_sync_copy(&edge_durations)
356            .map_err(|e| format!("HtoD edge_dur failed: {}", e))?;
357
358        // Get the compiled kernel function
359        let func = device
360            .get_func("dfg_construction", "dfg_construction_kernel")
361            .ok_or("DFG kernel not loaded - call compile() with generate_dfg_kernel() first")?;
362
363        // Launch configuration
364        let block_size = 256u32;
365        let grid_size = (pair_count as u32 + block_size - 1) / block_size;
366
367        let config = CudaLaunchConfig {
368            grid_dim: (grid_size, 1, 1),
369            block_dim: (block_size, 1, 1),
370            shared_mem_bytes: 0,
371        };
372
373        // Launch kernel
374        unsafe {
375            func.launch(
376                config,
377                (
378                    &d_sources,
379                    &d_targets,
380                    &d_durations,
381                    &d_edge_freq,
382                    &d_edge_dur,
383                    max_activities as i32,
384                    pair_count as i32,
385                ),
386            )
387            .map_err(|e| format!("Kernel launch failed: {}", e))?;
388        }
389
390        // Synchronize - wait for kernel completion
391        device
392            .synchronize()
393            .map_err(|e| format!("Device synchronize failed: {}", e))?;
394
395        // Device-to-Host transfers
396        let mut result_frequencies = vec![0u32; edge_count];
397        let mut result_durations = vec![0u64; edge_count];
398
399        device
400            .dtoh_sync_copy_into(&d_edge_freq, &mut result_frequencies)
401            .map_err(|e| format!("DtoH frequencies failed: {}", e))?;
402        device
403            .dtoh_sync_copy_into(&d_edge_dur, &mut result_durations)
404            .map_err(|e| format!("DtoH durations failed: {}", e))?;
405
406        let elapsed = start.elapsed().as_micros() as u64;
407
408        // Build edge results from frequency matrix
409        let mut edges = Vec::new();
410        for src in 0..max_activities {
411            for tgt in 0..max_activities {
412                let idx = src * max_activities + tgt;
413                let freq = result_frequencies[idx];
414                if freq > 0 {
415                    let total_dur = result_durations[idx];
416                    let avg_dur = total_dur as f32 / freq as f32;
417
418                    let mut edge = GpuDFGEdge::default();
419                    edge.source_activity = src as u32;
420                    edge.target_activity = tgt as u32;
421                    edge.frequency = freq;
422                    edge.avg_duration_ms = avg_dur;
423                    edges.push(edge);
424                }
425            }
426        }
427
428        // Record stats
429        let bytes_in = (pair_count * 3 * 4) as u64;
430        let bytes_out = (edge_count * 12) as u64;
431        let result = ExecutionResult::new(elapsed, pair_count as u64, true);
432        self.stats.record(&result, bytes_in, bytes_out);
433
434        log::debug!(
435            "DFG GPU kernel: {} pairs -> {} edges in {}us",
436            pair_count,
437            edges.len(),
438            elapsed
439        );
440
441        Ok((edges, result))
442    }
443
444    /// Execute pattern detection kernel on GPU.
445    #[cfg(feature = "cuda")]
446    pub fn execute_pattern_gpu(
447        &mut self,
448        nodes: &[crate::models::GpuDFGNode],
449        bottleneck_threshold: f32,
450        duration_threshold: f32,
451    ) -> Result<(Vec<crate::models::GpuPatternMatch>, ExecutionResult), String> {
452        use crate::models::{GpuPatternMatch, PatternSeverity, PatternType};
453
454        let device = self.device.as_ref().ok_or("No CUDA device available")?;
455        let start = std::time::Instant::now();
456
457        let n = nodes.len();
458        if n == 0 {
459            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
460        }
461
462        // Extract node data into Structure-of-Arrays format for GPU
463        let event_counts: Vec<u32> = nodes.iter().map(|n| n.event_count).collect();
464        let avg_durations: Vec<f32> = nodes.iter().map(|n| n.avg_duration_ms).collect();
465        let incoming_counts: Vec<u16> = nodes.iter().map(|n| n.incoming_count).collect();
466        let outgoing_counts: Vec<u16> = nodes.iter().map(|n| n.outgoing_count).collect();
467
468        // HtoD transfers
469        let d_event_counts = device
470            .htod_sync_copy(&event_counts)
471            .map_err(|e| format!("HtoD event_counts failed: {}", e))?;
472        let d_avg_durations = device
473            .htod_sync_copy(&avg_durations)
474            .map_err(|e| format!("HtoD avg_durations failed: {}", e))?;
475        let d_incoming = device
476            .htod_sync_copy(&incoming_counts)
477            .map_err(|e| format!("HtoD incoming failed: {}", e))?;
478        let d_outgoing = device
479            .htod_sync_copy(&outgoing_counts)
480            .map_err(|e| format!("HtoD outgoing failed: {}", e))?;
481
482        // Output buffers
483        let pattern_types = vec![0u8; n];
484        let pattern_confidences = vec![0.0f32; n];
485
486        let d_pattern_types = device
487            .htod_sync_copy(&pattern_types)
488            .map_err(|e| format!("HtoD pattern_types failed: {}", e))?;
489        let d_pattern_conf = device
490            .htod_sync_copy(&pattern_confidences)
491            .map_err(|e| format!("HtoD pattern_conf failed: {}", e))?;
492
493        // Get kernel function
494        let func = device
495            .get_func("pattern_detection", "pattern_detection_kernel")
496            .ok_or(
497                "Pattern kernel not loaded - call compile() with generate_pattern_kernel() first",
498            )?;
499
500        // Launch configuration
501        let block_size = 256u32;
502        let grid_size = (n as u32 + block_size - 1) / block_size;
503
504        let config = CudaLaunchConfig {
505            grid_dim: (grid_size, 1, 1),
506            block_dim: (block_size, 1, 1),
507            shared_mem_bytes: 0,
508        };
509
510        // Launch kernel
511        unsafe {
512            func.launch(
513                config,
514                (
515                    &d_event_counts,
516                    &d_avg_durations,
517                    &d_incoming,
518                    &d_outgoing,
519                    &d_pattern_types,
520                    &d_pattern_conf,
521                    bottleneck_threshold,
522                    duration_threshold,
523                    n as i32,
524                ),
525            )
526            .map_err(|e| format!("Pattern kernel launch failed: {}", e))?;
527        }
528
529        device
530            .synchronize()
531            .map_err(|e| format!("Device synchronize failed: {}", e))?;
532
533        // DtoH transfers
534        let mut result_types = vec![0u8; n];
535        let mut result_confidences = vec![0.0f32; n];
536
537        device
538            .dtoh_sync_copy_into(&d_pattern_types, &mut result_types)
539            .map_err(|e| format!("DtoH pattern_types failed: {}", e))?;
540        device
541            .dtoh_sync_copy_into(&d_pattern_conf, &mut result_confidences)
542            .map_err(|e| format!("DtoH pattern_conf failed: {}", e))?;
543
544        let elapsed = start.elapsed().as_micros() as u64;
545
546        // Build pattern results
547        let mut patterns = Vec::new();
548        for (i, &ptype) in result_types.iter().enumerate() {
549            if ptype != 0 {
550                let pattern_type = match ptype {
551                    6 => PatternType::LongRunning,
552                    7 => PatternType::Bottleneck,
553                    _ => continue,
554                };
555
556                let severity = if ptype == 7 {
557                    PatternSeverity::Critical
558                } else {
559                    PatternSeverity::Warning
560                };
561
562                let mut pattern = GpuPatternMatch::new(pattern_type, severity);
563                pattern.add_activity(nodes[i].activity_id);
564                pattern.confidence = result_confidences[i];
565                pattern.frequency = nodes[i].event_count;
566                pattern.avg_duration_ms = nodes[i].avg_duration_ms;
567                patterns.push(pattern);
568            }
569        }
570
571        // Record stats
572        let bytes_in = (n * 12) as u64;
573        let bytes_out = (n * 5) as u64;
574        let result = ExecutionResult::new(elapsed, n as u64, true);
575        self.stats.record(&result, bytes_in, bytes_out);
576
577        log::debug!(
578            "Pattern GPU kernel: {} nodes -> {} patterns in {}us",
579            n,
580            patterns.len(),
581            elapsed
582        );
583
584        Ok((patterns, result))
585    }
586
587    /// Execute partial order derivation kernel on GPU.
588    #[cfg(feature = "cuda")]
589    pub fn execute_partial_order_gpu(
590        &mut self,
591        events: &[crate::models::GpuObjectEvent],
592    ) -> Result<(Vec<crate::models::GpuPartialOrderTrace>, ExecutionResult), String> {
593        use crate::models::{GpuPartialOrderTrace, HybridTimestamp};
594        use std::collections::HashMap;
595
596        let device = self.device.as_ref().ok_or("No CUDA device available")?;
597        let start = std::time::Instant::now();
598
599        // Group events by case
600        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
601        for event in events {
602            case_events.entry(event.object_id).or_default().push(event);
603        }
604
605        // For each case, we'll run a GPU kernel to compute the precedence matrix
606        let mut all_traces = Vec::with_capacity(case_events.len());
607        let mut total_kernel_time_us = 0u64;
608
609        for (case_id, case_evts) in case_events {
610            if case_evts.len() < 2 {
611                continue;
612            }
613
614            let mut sorted: Vec<_> = case_evts.into_iter().collect();
615            sorted.sort_by_key(|e| e.timestamp.physical_ms);
616
617            let n = sorted.len().min(16);
618
619            // Extract timing data
620            let start_times: Vec<u64> = sorted.iter().map(|e| e.timestamp.physical_ms).collect();
621            let end_times: Vec<u64> = sorted
622                .iter()
623                .map(|e| e.timestamp.physical_ms + e.duration_ms as u64)
624                .collect();
625
626            // Pad to 16 elements for consistent GPU buffer sizes
627            let mut start_times_padded = vec![0u64; 16];
628            let mut end_times_padded = vec![0u64; 16];
629            for i in 0..n {
630                start_times_padded[i] = start_times[i];
631                end_times_padded[i] = end_times[i];
632            }
633
634            // HtoD transfers
635            let d_start_times = device
636                .htod_sync_copy(&start_times_padded)
637                .map_err(|e| format!("HtoD start_times failed: {}", e))?;
638            let d_end_times = device
639                .htod_sync_copy(&end_times_padded)
640                .map_err(|e| format!("HtoD end_times failed: {}", e))?;
641
642            // Output buffer (16x16 = 256 elements)
643            let precedence_flat = vec![0u32; 256];
644            let d_precedence = device
645                .htod_sync_copy(&precedence_flat)
646                .map_err(|e| format!("HtoD precedence failed: {}", e))?;
647
648            // Get kernel function
649            let func = device
650                .get_func("partial_order", "partial_order_kernel")
651                .ok_or("Partial order kernel not loaded")?;
652
653            // Launch configuration (16x16 grid for pairwise comparison)
654            let config = CudaLaunchConfig {
655                grid_dim: (1, 1, 1),
656                block_dim: (16, 16, 1),
657                shared_mem_bytes: 0,
658            };
659
660            let kernel_start = std::time::Instant::now();
661
662            // Launch kernel
663            unsafe {
664                func.launch(
665                    config,
666                    (&d_start_times, &d_end_times, &d_precedence, 16i32, 16i32),
667                )
668                .map_err(|e| format!("Partial order kernel launch failed: {}", e))?;
669            }
670
671            device
672                .synchronize()
673                .map_err(|e| format!("Device synchronize failed: {}", e))?;
674
675            total_kernel_time_us += kernel_start.elapsed().as_micros() as u64;
676
677            // DtoH transfer
678            let mut result_precedence = vec![0u32; 256];
679            device
680                .dtoh_sync_copy_into(&d_precedence, &mut result_precedence)
681                .map_err(|e| format!("DtoH precedence failed: {}", e))?;
682
683            // Convert flat precedence to 16x16 bit matrix
684            let mut precedence_matrix = [0u16; 16];
685            for i in 0..n {
686                for j in 0..n {
687                    if result_precedence[i * 16 + j] != 0 {
688                        precedence_matrix[i] |= 1u16 << j;
689                    }
690                }
691            }
692
693            // Build activity IDs array
694            let mut activity_ids = [u32::MAX; 16];
695            for (i, event) in sorted.iter().take(n).enumerate() {
696                activity_ids[i] = event.activity_id;
697            }
698
699            // Compute timing in seconds
700            let trace_start_ms = sorted.first().map(|e| e.timestamp.physical_ms).unwrap_or(0);
701            let mut activity_start_secs = [0u16; 16];
702            let mut activity_duration_secs = [0u16; 16];
703            for (i, event) in sorted.iter().take(n).enumerate() {
704                let rel_start = event.timestamp.physical_ms.saturating_sub(trace_start_ms);
705                activity_start_secs[i] = (rel_start / 1000).min(u16::MAX as u64) as u16;
706                activity_duration_secs[i] =
707                    (event.duration_ms / 1000).max(1).min(u16::MAX as u32) as u16;
708            }
709
710            // Build trace
711            let trace = GpuPartialOrderTrace {
712                trace_id: all_traces.len() as u64 + 1,
713                case_id,
714                event_count: sorted.len() as u32,
715                activity_count: n as u32,
716                start_time: sorted.first().map(|e| e.timestamp).unwrap_or_default(),
717                end_time: HybridTimestamp::new(
718                    sorted
719                        .last()
720                        .map(|e| e.timestamp.physical_ms + e.duration_ms as u64)
721                        .unwrap_or(0),
722                    0,
723                ),
724                max_width: Self::compute_width_from_matrix(&precedence_matrix, n),
725                flags: 0x01, // Has transitive closure flag
726                precedence_matrix,
727                activity_ids,
728                activity_start_secs,
729                activity_duration_secs,
730                _reserved: [0u8; 32],
731            };
732
733            all_traces.push(trace);
734        }
735
736        let elapsed = start.elapsed().as_micros() as u64;
737        let result = ExecutionResult::new(elapsed, events.len() as u64, true);
738
739        // Record stats
740        let bytes_in = (events.len() * 16) as u64;
741        let bytes_out = (all_traces.len() * 256) as u64;
742        self.stats.record(&result, bytes_in, bytes_out);
743
744        log::debug!(
745            "Partial order GPU kernel: {} events -> {} traces in {}us (kernel: {}us)",
746            events.len(),
747            all_traces.len(),
748            elapsed,
749            total_kernel_time_us
750        );
751
752        Ok((all_traces, result))
753    }
754
755    #[cfg(feature = "cuda")]
756    fn compute_width_from_matrix(precedence: &[u16; 16], n: usize) -> u32 {
757        let mut max_width = 1u32;
758
759        for i in 0..n {
760            let mut concurrent = 1u32;
761            for j in (i + 1)..n {
762                let i_precedes_j = (precedence[i] & (1u16 << j)) != 0;
763                let j_precedes_i = (precedence[j] & (1u16 << i)) != 0;
764                if !i_precedes_j && !j_precedes_i {
765                    concurrent += 1;
766                }
767            }
768            max_width = max_width.max(concurrent);
769        }
770
771        max_width
772    }
773
774    /// Execute conformance checking kernel on GPU.
775    #[cfg(feature = "cuda")]
776    pub fn execute_conformance_gpu(
777        &mut self,
778        events: &[crate::models::GpuObjectEvent],
779        model: &crate::models::ProcessModel,
780    ) -> Result<(Vec<crate::models::ConformanceResult>, ExecutionResult), String> {
781        use crate::models::{ComplianceLevel, ConformanceResult, ConformanceStatus};
782        use std::collections::HashMap;
783
784        let device = self.device.as_ref().ok_or("No CUDA device available")?;
785        let start = std::time::Instant::now();
786
787        // Group events by case
788        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
789        for event in events {
790            case_events.entry(event.object_id).or_default().push(event);
791        }
792
793        // Flatten traces for GPU processing
794        let mut all_activities: Vec<u32> = Vec::new();
795        let mut trace_starts: Vec<i32> = Vec::new();
796        let mut trace_lengths: Vec<i32> = Vec::new();
797        let mut trace_case_ids: Vec<u64> = Vec::new();
798
799        for (case_id, case_evts) in &case_events {
800            let mut sorted: Vec<_> = case_evts.iter().collect();
801            sorted.sort_by_key(|e| e.timestamp.physical_ms);
802
803            trace_starts.push(all_activities.len() as i32);
804            trace_lengths.push(sorted.len() as i32);
805            trace_case_ids.push(*case_id);
806
807            for event in sorted {
808                all_activities.push(event.activity_id);
809            }
810        }
811
812        let num_traces = trace_starts.len();
813        if num_traces == 0 {
814            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
815        }
816
817        // Extract model transitions
818        let model_sources: Vec<u32> = model.transitions.iter().map(|(s, _)| *s).collect();
819        let model_targets: Vec<u32> = model.transitions.iter().map(|(_, t)| *t).collect();
820        let num_transitions = model_sources.len() as i32;
821
822        // HtoD transfers
823        let d_activities = device
824            .htod_sync_copy(&all_activities)
825            .map_err(|e| format!("HtoD activities failed: {}", e))?;
826        let d_trace_starts = device
827            .htod_sync_copy(&trace_starts)
828            .map_err(|e| format!("HtoD trace_starts failed: {}", e))?;
829        let d_trace_lengths = device
830            .htod_sync_copy(&trace_lengths)
831            .map_err(|e| format!("HtoD trace_lengths failed: {}", e))?;
832        let d_model_sources = device
833            .htod_sync_copy(&model_sources)
834            .map_err(|e| format!("HtoD model_sources failed: {}", e))?;
835        let d_model_targets = device
836            .htod_sync_copy(&model_targets)
837            .map_err(|e| format!("HtoD model_targets failed: {}", e))?;
838
839        // Output buffer
840        let fitness_scores = vec![0.0f32; num_traces];
841        let d_fitness = device
842            .htod_sync_copy(&fitness_scores)
843            .map_err(|e| format!("HtoD fitness failed: {}", e))?;
844
845        // Get kernel function
846        let func = device
847            .get_func("conformance", "conformance_kernel")
848            .ok_or("Conformance kernel not loaded")?;
849
850        // Launch configuration
851        let block_size = 256u32;
852        let grid_size = (num_traces as u32 + block_size - 1) / block_size;
853
854        let config = CudaLaunchConfig {
855            grid_dim: (grid_size, 1, 1),
856            block_dim: (block_size, 1, 1),
857            shared_mem_bytes: 0,
858        };
859
860        // Launch kernel
861        unsafe {
862            func.launch(
863                config,
864                (
865                    &d_activities,
866                    &d_trace_starts,
867                    &d_trace_lengths,
868                    &d_model_sources,
869                    &d_model_targets,
870                    num_transitions,
871                    &d_fitness,
872                    num_traces as i32,
873                ),
874            )
875            .map_err(|e| format!("Conformance kernel launch failed: {}", e))?;
876        }
877
878        device
879            .synchronize()
880            .map_err(|e| format!("Device synchronize failed: {}", e))?;
881
882        // DtoH transfer
883        let mut result_fitness = vec![0.0f32; num_traces];
884        device
885            .dtoh_sync_copy_into(&d_fitness, &mut result_fitness)
886            .map_err(|e| format!("DtoH fitness failed: {}", e))?;
887
888        let elapsed = start.elapsed().as_micros() as u64;
889
890        // Build conformance results
891        let mut results = Vec::with_capacity(num_traces);
892        for (i, &fitness) in result_fitness.iter().enumerate() {
893            let status = if fitness >= 1.0 {
894                ConformanceStatus::Conformant
895            } else if fitness >= 0.8 {
896                ConformanceStatus::Deviation
897            } else {
898                ConformanceStatus::MissingActivity
899            };
900
901            results.push(ConformanceResult {
902                trace_id: trace_case_ids[i],
903                model_id: model.id,
904                status: status as u8,
905                compliance_level: ComplianceLevel::from_fitness(fitness) as u8,
906                fitness,
907                precision: fitness, // Simplified: same as fitness
908                generalization: 0.8,
909                simplicity: 1.0,
910                missing_count: ((1.0 - fitness) * trace_lengths[i] as f32) as u16,
911                extra_count: 0,
912                alignment_cost: ((1.0 - fitness) * trace_lengths[i] as f32) as u32,
913                alignment_length: trace_lengths[i] as u32,
914                _padding1: [0; 2],
915                _reserved: [0; 16],
916            });
917        }
918
919        // Record stats
920        let bytes_in = (all_activities.len() * 4 + model_sources.len() * 8) as u64;
921        let bytes_out = (num_traces * 4) as u64;
922        let result = ExecutionResult::new(elapsed, events.len() as u64, true);
923        self.stats.record(&result, bytes_in, bytes_out);
924
925        log::debug!(
926            "Conformance GPU kernel: {} traces in {}us, avg fitness: {:.2}",
927            num_traces,
928            elapsed,
929            result_fitness.iter().sum::<f32>() / num_traces as f32
930        );
931
932        Ok((results, result))
933    }
934}
935
936/// CPU fallback executor for when CUDA is not available.
937pub struct CpuFallbackExecutor;
938
939impl CpuFallbackExecutor {
940    /// Execute DFG construction on CPU.
941    pub fn execute_dfg_construction(
942        events: &[crate::models::GpuObjectEvent],
943        edges: &mut [crate::models::GpuDFGEdge],
944        max_activities: usize,
945    ) -> ExecutionResult {
946        let start = std::time::Instant::now();
947
948        let mut case_events: std::collections::HashMap<u64, Vec<&crate::models::GpuObjectEvent>> =
949            std::collections::HashMap::new();
950
951        for event in events {
952            case_events.entry(event.object_id).or_default().push(event);
953        }
954
955        for events in case_events.values() {
956            let mut sorted_events: Vec<_> = events.iter().collect();
957            sorted_events.sort_by_key(|e| e.timestamp.physical_ms);
958
959            for window in sorted_events.windows(2) {
960                let source = window[0].activity_id as usize;
961                let target = window[1].activity_id as usize;
962
963                if source < max_activities && target < max_activities {
964                    let edge_idx = source * max_activities + target;
965                    if edge_idx < edges.len() {
966                        edges[edge_idx].frequency += 1;
967                        edges[edge_idx].source_activity = source as u32;
968                        edges[edge_idx].target_activity = target as u32;
969                    }
970                }
971            }
972        }
973
974        let elapsed = start.elapsed().as_micros() as u64;
975        ExecutionResult::new(elapsed, events.len() as u64, false)
976    }
977
978    /// Execute pattern detection on CPU.
979    pub fn execute_pattern_detection(
980        nodes: &[crate::models::GpuDFGNode],
981        patterns: &mut Vec<crate::models::GpuPatternMatch>,
982        bottleneck_threshold: f32,
983        duration_threshold: f32,
984    ) -> ExecutionResult {
985        use crate::models::{GpuPatternMatch, PatternSeverity, PatternType};
986
987        let start = std::time::Instant::now();
988
989        for node in nodes {
990            if node.event_count == 0 {
991                continue;
992            }
993
994            let incoming_count = node.incoming_count as f32;
995            let outgoing_count = node.outgoing_count as f32;
996            if incoming_count > bottleneck_threshold && outgoing_count < incoming_count * 0.5 {
997                let mut pattern =
998                    GpuPatternMatch::new(PatternType::Bottleneck, PatternSeverity::Critical);
999                pattern.add_activity(node.activity_id);
1000                pattern.confidence = incoming_count / bottleneck_threshold;
1001                pattern.frequency = node.event_count;
1002                pattern.avg_duration_ms = node.avg_duration_ms;
1003                patterns.push(pattern);
1004            }
1005
1006            if node.avg_duration_ms > duration_threshold {
1007                let mut pattern =
1008                    GpuPatternMatch::new(PatternType::LongRunning, PatternSeverity::Warning);
1009                pattern.add_activity(node.activity_id);
1010                pattern.confidence = node.avg_duration_ms / duration_threshold;
1011                pattern.frequency = node.event_count;
1012                pattern.avg_duration_ms = node.avg_duration_ms;
1013                patterns.push(pattern);
1014            }
1015        }
1016
1017        let elapsed = start.elapsed().as_micros() as u64;
1018        ExecutionResult::new(elapsed, nodes.len() as u64, false)
1019    }
1020
1021    /// Execute partial order derivation on CPU.
1022    pub fn execute_partial_order(
1023        events: &[crate::models::GpuObjectEvent],
1024        traces: &mut Vec<crate::models::GpuPartialOrderTrace>,
1025    ) -> ExecutionResult {
1026        use crate::models::{GpuPartialOrderTrace, HybridTimestamp};
1027        use std::collections::HashMap;
1028
1029        let start = std::time::Instant::now();
1030
1031        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
1032        for event in events {
1033            case_events.entry(event.object_id).or_default().push(event);
1034        }
1035
1036        for (case_id, case_evts) in &case_events {
1037            if case_evts.len() < 2 {
1038                continue;
1039            }
1040
1041            let mut sorted: Vec<_> = case_evts.iter().collect();
1042            sorted.sort_by_key(|e| e.timestamp.physical_ms);
1043
1044            let n = sorted.len().min(16);
1045            let mut precedence = [0u16; 16];
1046            let mut activity_ids = [0u32; 16];
1047            let mut activity_start_secs = [0u16; 16];
1048            let mut activity_duration_secs = [0u16; 16];
1049
1050            // Get trace start time for relative timing
1051            let trace_start_ms = sorted.first().map(|e| e.timestamp.physical_ms).unwrap_or(0);
1052
1053            for i in 0..n {
1054                activity_ids[i] = sorted[i].activity_id;
1055
1056                // Store timing info in SECONDS (relative to trace start)
1057                let rel_start_ms = sorted[i]
1058                    .timestamp
1059                    .physical_ms
1060                    .saturating_sub(trace_start_ms);
1061                activity_start_secs[i] = (rel_start_ms / 1000).min(u16::MAX as u64) as u16;
1062                activity_duration_secs[i] =
1063                    (sorted[i].duration_ms / 1000).max(1).min(u16::MAX as u32) as u16;
1064
1065                for j in (i + 1)..n {
1066                    let i_end = sorted[i].timestamp.physical_ms + sorted[i].duration_ms as u64;
1067                    let j_start = sorted[j].timestamp.physical_ms;
1068                    if i_end <= j_start {
1069                        precedence[i] |= 1u16 << j;
1070                    }
1071                }
1072            }
1073
1074            // Calculate trace end time (last event end)
1075            let trace_end_ms = sorted
1076                .iter()
1077                .take(n)
1078                .map(|e| e.timestamp.physical_ms + e.duration_ms as u64)
1079                .max()
1080                .unwrap_or(0);
1081
1082            let trace = GpuPartialOrderTrace {
1083                trace_id: traces.len() as u64 + 1,
1084                case_id: *case_id,
1085                event_count: sorted.len() as u32,
1086                activity_count: n as u32,
1087                start_time: sorted.first().map(|e| e.timestamp).unwrap_or_default(),
1088                end_time: HybridTimestamp::new(trace_end_ms, 0),
1089                max_width: Self::compute_width(&precedence, n),
1090                flags: 0,
1091                precedence_matrix: precedence,
1092                activity_ids,
1093                activity_start_secs,
1094                activity_duration_secs,
1095                _reserved: [0u8; 32],
1096            };
1097
1098            traces.push(trace);
1099        }
1100
1101        let elapsed = start.elapsed().as_micros() as u64;
1102        ExecutionResult::new(elapsed, events.len() as u64, false)
1103    }
1104
1105    fn compute_width(precedence: &[u16; 16], n: usize) -> u32 {
1106        let mut max_width = 1u32;
1107
1108        for i in 0..n {
1109            let mut concurrent = 1u32;
1110            for j in (i + 1)..n {
1111                let i_precedes_j = (precedence[i] & (1u16 << j)) != 0;
1112                let j_precedes_i = (precedence[j] & (1u16 << i)) != 0;
1113                if !i_precedes_j && !j_precedes_i {
1114                    concurrent += 1;
1115                }
1116            }
1117            max_width = max_width.max(concurrent);
1118        }
1119
1120        max_width
1121    }
1122
1123    /// Execute conformance checking on CPU.
1124    pub fn execute_conformance(
1125        events: &[crate::models::GpuObjectEvent],
1126        model: &crate::models::ProcessModel,
1127    ) -> (Vec<crate::models::ConformanceResult>, ExecutionResult) {
1128        use crate::models::{ComplianceLevel, ConformanceResult, ConformanceStatus};
1129        use std::collections::HashMap;
1130
1131        let start = std::time::Instant::now();
1132        let mut results = Vec::new();
1133
1134        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
1135        for event in events {
1136            case_events.entry(event.object_id).or_default().push(event);
1137        }
1138
1139        for (case_id, case_evts) in &case_events {
1140            if case_evts.is_empty() {
1141                continue;
1142            }
1143
1144            let mut sorted: Vec<_> = case_evts.iter().collect();
1145            sorted.sort_by_key(|e| e.timestamp.physical_ms);
1146
1147            let mut valid_moves = 0u32;
1148            let total_moves = (sorted.len() - 1) as u32;
1149
1150            for window in sorted.windows(2) {
1151                let source = window[0].activity_id;
1152                let target = window[1].activity_id;
1153
1154                let is_valid = model
1155                    .transitions
1156                    .iter()
1157                    .any(|(s, t)| *s == source && *t == target);
1158
1159                if is_valid {
1160                    valid_moves += 1;
1161                }
1162            }
1163
1164            let fitness = if total_moves > 0 {
1165                valid_moves as f32 / total_moves as f32
1166            } else {
1167                1.0
1168            };
1169
1170            let compliance = ComplianceLevel::from_fitness(fitness);
1171
1172            let status = if fitness >= 0.95 {
1173                ConformanceStatus::Conformant
1174            } else if total_moves > valid_moves {
1175                ConformanceStatus::ExtraActivity
1176            } else {
1177                ConformanceStatus::WrongSequence
1178            };
1179
1180            let extra = total_moves.saturating_sub(valid_moves) as u16;
1181
1182            results.push(ConformanceResult {
1183                trace_id: *case_id,
1184                model_id: model.id,
1185                status: status as u8,
1186                compliance_level: compliance as u8,
1187                _padding1: [0; 2],
1188                fitness,
1189                precision: 1.0 - extra as f32 / (total_moves + 1) as f32,
1190                generalization: 0.8,
1191                simplicity: 1.0,
1192                missing_count: 0,
1193                extra_count: extra,
1194                alignment_cost: extra as u32,
1195                alignment_length: sorted.len() as u32,
1196                _reserved: [0u8; 16],
1197            });
1198        }
1199
1200        let elapsed = start.elapsed().as_micros() as u64;
1201        let exec_result = ExecutionResult::new(elapsed, events.len() as u64, false);
1202
1203        (results, exec_result)
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210    use crate::cuda::codegen::generate_dfg_kernel;
1211
1212    #[test]
1213    fn test_executor_creation() {
1214        let executor = KernelExecutor::new();
1215        assert_eq!(executor.kernel_count(), 0);
1216    }
1217
1218    #[test]
1219    fn test_kernel_compilation() {
1220        let mut executor = KernelExecutor::new();
1221        let gpu_status = executor.gpu_status();
1222        let source = generate_dfg_kernel();
1223        let result = executor.compile(&source);
1224
1225        // Convert result to bool to release borrow
1226        let is_ok = result.is_ok();
1227        let err_msg = if let Err(ref e) = result {
1228            Some(e.clone())
1229        } else {
1230            None
1231        };
1232
1233        // Print error for debugging if it fails
1234        if let Some(e) = err_msg {
1235            eprintln!("Kernel compilation error: {}", e);
1236        }
1237
1238        // Skip GPU compilation tests when no CUDA available
1239        if gpu_status == GpuStatus::CpuFallback {
1240            assert!(is_ok); // CPU fallback should still cache the kernel
1241            assert_eq!(executor.kernel_count(), 1);
1242        } else {
1243            // With CUDA, might fail due to NVRTC issues in test env
1244            // Either way, kernel should be in cache (compiled or pending)
1245            assert!(is_ok || gpu_status == GpuStatus::CudaError);
1246        }
1247    }
1248
1249    #[test]
1250    fn test_execution_result() {
1251        let result = ExecutionResult::new(1000, 1_000_000, true);
1252        assert_eq!(result.throughput, 1_000_000_000.0);
1253        assert!(result.used_gpu);
1254    }
1255
1256    #[test]
1257    fn test_gpu_status() {
1258        let executor = KernelExecutor::new();
1259        let status = executor.gpu_status();
1260        assert!(matches!(
1261            status,
1262            GpuStatus::CpuFallback
1263                | GpuStatus::CudaReady
1264                | GpuStatus::CudaError
1265                | GpuStatus::CudaNotCompiled
1266        ));
1267    }
1268
1269    #[test]
1270    fn test_gpu_stats() {
1271        let mut stats = GpuStats::default();
1272        let result = ExecutionResult::new(1000, 10000, true);
1273        stats.record(&result, 4000, 2000);
1274
1275        assert_eq!(stats.kernel_launches, 1);
1276        assert_eq!(stats.total_gpu_time_us, 1000);
1277        assert_eq!(stats.bytes_to_gpu, 4000);
1278        assert_eq!(stats.bytes_from_gpu, 2000);
1279    }
1280}