ringkernel_procint/cuda/
executor.rs

1//! CUDA kernel execution manager.
2//!
3//! Manages GPU kernel compilation and execution with CPU fallback.
4//! Uses ringkernel-cuda patterns for proper GPU integration.
5
6use super::{KernelSource, KernelType};
7
8#[allow(unused_imports)]
9use super::LaunchConfig;
10
11#[cfg(feature = "cuda")]
12use cudarc::driver::{CudaDevice, LaunchAsync, LaunchConfig as CudaLaunchConfig};
13#[cfg(feature = "cuda")]
14use std::sync::Arc;
15
16/// Kernel execution result.
17#[derive(Debug, Clone)]
18pub struct ExecutionResult {
19    /// Execution time in microseconds.
20    pub execution_time_us: u64,
21    /// Number of elements processed.
22    pub elements_processed: u64,
23    /// Throughput in elements per second.
24    pub throughput: f64,
25    /// Whether GPU was used.
26    pub used_gpu: bool,
27}
28
29impl Default for ExecutionResult {
30    fn default() -> Self {
31        Self {
32            execution_time_us: 0,
33            elements_processed: 0,
34            throughput: 0.0,
35            used_gpu: false,
36        }
37    }
38}
39
40impl ExecutionResult {
41    /// Create a new execution result.
42    pub fn new(execution_time_us: u64, elements_processed: u64, used_gpu: bool) -> Self {
43        let throughput = if execution_time_us > 0 {
44            elements_processed as f64 * 1_000_000.0 / execution_time_us as f64
45        } else {
46            0.0
47        };
48        Self {
49            execution_time_us,
50            elements_processed,
51            throughput,
52            used_gpu,
53        }
54    }
55}
56
57/// Compiled kernel handle.
58#[derive(Debug)]
59pub struct CompiledKernel {
60    /// Kernel name.
61    pub name: String,
62    /// Entry point.
63    pub entry_point: String,
64    /// Is compiled.
65    pub is_compiled: bool,
66    /// Kernel type.
67    pub kernel_type: KernelType,
68    /// CUDA source code.
69    pub source: String,
70}
71
72/// GPU execution backend status.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum GpuStatus {
75    /// No GPU available, using CPU fallback.
76    CpuFallback,
77    /// CUDA GPU available and initialized.
78    CudaReady,
79    /// GPU available but not initialized.
80    CudaPending,
81    /// GPU initialization failed.
82    CudaError,
83    /// CUDA feature not compiled in.
84    CudaNotCompiled,
85}
86
87impl GpuStatus {
88    /// Human-readable status string.
89    pub fn as_str(&self) -> &'static str {
90        match self {
91            GpuStatus::CpuFallback => "CPU",
92            GpuStatus::CudaReady => "CUDA",
93            GpuStatus::CudaPending => "CUDA (init)",
94            GpuStatus::CudaError => "CUDA (err)",
95            GpuStatus::CudaNotCompiled => "CPU (no CUDA)",
96        }
97    }
98
99    /// Check if CUDA feature is compiled.
100    pub fn is_cuda_compiled() -> bool {
101        cfg!(feature = "cuda")
102    }
103}
104
105/// GPU usage statistics.
106#[derive(Debug, Clone, Default)]
107pub struct GpuStats {
108    /// Total kernel launches.
109    pub kernel_launches: u64,
110    /// Total GPU execution time in microseconds.
111    pub total_gpu_time_us: u64,
112    /// Total elements processed on GPU.
113    pub total_elements_gpu: u64,
114    /// Total bytes transferred to GPU.
115    pub bytes_to_gpu: u64,
116    /// Total bytes transferred from GPU.
117    pub bytes_from_gpu: u64,
118}
119
120impl GpuStats {
121    /// Record a kernel execution.
122    pub fn record(&mut self, result: &ExecutionResult, bytes_in: u64, bytes_out: u64) {
123        if result.used_gpu {
124            self.kernel_launches += 1;
125            self.total_gpu_time_us += result.execution_time_us;
126            self.total_elements_gpu += result.elements_processed;
127            self.bytes_to_gpu += bytes_in;
128            self.bytes_from_gpu += bytes_out;
129        }
130    }
131
132    /// Get average kernel time in microseconds.
133    pub fn avg_kernel_time_us(&self) -> f64 {
134        if self.kernel_launches > 0 {
135            self.total_gpu_time_us as f64 / self.kernel_launches as f64
136        } else {
137            0.0
138        }
139    }
140
141    /// Get throughput in elements per second.
142    pub fn throughput(&self) -> f64 {
143        if self.total_gpu_time_us > 0 {
144            self.total_elements_gpu as f64 * 1_000_000.0 / self.total_gpu_time_us as f64
145        } else {
146            0.0
147        }
148    }
149}
150
151/// Kernel executor for GPU operations.
152pub struct KernelExecutor {
153    /// GPU status.
154    gpu_status: GpuStatus,
155    /// Compiled kernel cache.
156    kernel_cache: std::collections::HashMap<String, CompiledKernel>,
157    /// GPU usage statistics.
158    pub stats: GpuStats,
159    /// CUDA device handle.
160    #[cfg(feature = "cuda")]
161    device: Option<Arc<CudaDevice>>,
162}
163
164impl std::fmt::Debug for KernelExecutor {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        f.debug_struct("KernelExecutor")
167            .field("gpu_status", &self.gpu_status)
168            .field("kernel_count", &self.kernel_cache.len())
169            .field("stats", &self.stats)
170            .finish()
171    }
172}
173
174impl Default for KernelExecutor {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl KernelExecutor {
181    /// Create a new kernel executor.
182    pub fn new() -> Self {
183        #[cfg(feature = "cuda")]
184        {
185            match CudaDevice::new(0) {
186                Ok(device) => {
187                    log::info!("CUDA device initialized successfully");
188                    Self {
189                        gpu_status: GpuStatus::CudaReady,
190                        kernel_cache: std::collections::HashMap::new(),
191                        stats: GpuStats::default(),
192                        device: Some(device),
193                    }
194                }
195                Err(e) => {
196                    log::warn!("CUDA device initialization failed: {}", e);
197                    Self {
198                        gpu_status: GpuStatus::CudaError,
199                        kernel_cache: std::collections::HashMap::new(),
200                        stats: GpuStats::default(),
201                        device: None,
202                    }
203                }
204            }
205        }
206
207        #[cfg(not(feature = "cuda"))]
208        {
209            Self {
210                gpu_status: GpuStatus::CudaNotCompiled,
211                kernel_cache: std::collections::HashMap::new(),
212                stats: GpuStats::default(),
213            }
214        }
215    }
216
217    /// Get GPU status.
218    pub fn gpu_status(&self) -> GpuStatus {
219        self.gpu_status
220    }
221
222    /// Check if CUDA is available.
223    pub fn is_cuda_available(&self) -> bool {
224        self.gpu_status == GpuStatus::CudaReady
225    }
226
227    /// Get CUDA device reference.
228    #[cfg(feature = "cuda")]
229    pub fn device(&self) -> Option<&Arc<CudaDevice>> {
230        self.device.as_ref()
231    }
232
233    /// Compile a kernel from source using NVRTC.
234    pub fn compile(&mut self, source: &KernelSource) -> Result<&CompiledKernel, String> {
235        // Clone the name first to avoid lifetime issues
236        let kernel_name = source.name.clone();
237        let entry_point = source.entry_point.clone();
238        let cuda_source = source.source.clone();
239
240        if self.kernel_cache.contains_key(&kernel_name) {
241            return Ok(self.kernel_cache.get(&kernel_name).unwrap());
242        }
243
244        #[cfg(feature = "cuda")]
245        if let Some(device) = &self.device {
246            // Compile CUDA C to PTX using NVRTC
247            let ptx = cudarc::nvrtc::compile_ptx(&cuda_source)
248                .map_err(|e| format!("NVRTC compilation failed for {}: {}", kernel_name, e))?;
249
250            // Load the PTX module into the device
251            // load_ptx requires 'static strings, so we use leaked Box<str>
252            let module_name: &'static str = Box::leak(kernel_name.clone().into_boxed_str());
253            let func_name: &'static str = Box::leak(entry_point.clone().into_boxed_str());
254
255            device
256                .load_ptx(ptx, module_name, &[func_name])
257                .map_err(|e| {
258                    format!(
259                        "Failed to load PTX for {} (func: {}): {}",
260                        kernel_name, func_name, e
261                    )
262                })?;
263
264            log::info!(
265                "Compiled and loaded CUDA kernel: {} (entry: {})",
266                kernel_name,
267                entry_point
268            );
269        }
270
271        let compiled = CompiledKernel {
272            name: kernel_name.clone(),
273            entry_point,
274            is_compiled: true,
275            kernel_type: source.kernel_type,
276            source: cuda_source,
277        };
278
279        self.kernel_cache.insert(kernel_name.clone(), compiled);
280        Ok(self.kernel_cache.get(&kernel_name).unwrap())
281    }
282
283    /// Get compiled kernel count.
284    pub fn kernel_count(&self) -> usize {
285        self.kernel_cache.len()
286    }
287
288    /// Execute DFG construction kernel on GPU.
289    #[cfg(feature = "cuda")]
290    pub fn execute_dfg_gpu(
291        &mut self,
292        events: &[crate::models::GpuObjectEvent],
293    ) -> Result<(Vec<crate::models::GpuDFGEdge>, ExecutionResult), String> {
294        use crate::models::GpuDFGEdge;
295
296        let device = self.device.as_ref().ok_or("No CUDA device available")?;
297        let start = std::time::Instant::now();
298
299        let n = events.len();
300        if n < 2 {
301            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
302        }
303
304        // Extract activity pairs from consecutive events per case
305        let mut source_activities: Vec<u32> = Vec::new();
306        let mut target_activities: Vec<u32> = Vec::new();
307        let mut durations: Vec<u32> = Vec::new();
308
309        // Group by case and extract transitions
310        let mut case_events: std::collections::HashMap<u64, Vec<&crate::models::GpuObjectEvent>> =
311            std::collections::HashMap::new();
312        for event in events {
313            case_events.entry(event.object_id).or_default().push(event);
314        }
315
316        for case_evts in case_events.values() {
317            let mut sorted: Vec<_> = case_evts.iter().collect();
318            sorted.sort_by_key(|e| e.timestamp.physical_ms);
319
320            for window in sorted.windows(2) {
321                source_activities.push(window[0].activity_id);
322                target_activities.push(window[1].activity_id);
323                durations.push(window[0].duration_ms);
324            }
325        }
326
327        let pair_count = source_activities.len();
328        if pair_count == 0 {
329            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
330        }
331
332        // Max activities for edge matrix (32x32)
333        let max_activities = 32usize;
334        let edge_count = max_activities * max_activities;
335
336        // Host-to-Device transfers
337        let d_sources = device
338            .htod_sync_copy(&source_activities)
339            .map_err(|e| format!("HtoD sources failed: {}", e))?;
340        let d_targets = device
341            .htod_sync_copy(&target_activities)
342            .map_err(|e| format!("HtoD targets failed: {}", e))?;
343        let d_durations = device
344            .htod_sync_copy(&durations)
345            .map_err(|e| format!("HtoD durations failed: {}", e))?;
346
347        // Allocate output buffers (initialized to zeros)
348        let edge_frequencies = vec![0u32; edge_count];
349        let edge_durations = vec![0u64; edge_count];
350
351        let d_edge_freq = device
352            .htod_sync_copy(&edge_frequencies)
353            .map_err(|e| format!("HtoD edge_freq failed: {}", e))?;
354        let d_edge_dur = device
355            .htod_sync_copy(&edge_durations)
356            .map_err(|e| format!("HtoD edge_dur failed: {}", e))?;
357
358        // Get the compiled kernel function
359        let func = device
360            .get_func("dfg_construction", "dfg_construction_kernel")
361            .ok_or("DFG kernel not loaded - call compile() with generate_dfg_kernel() first")?;
362
363        // Launch configuration
364        let block_size = 256u32;
365        let grid_size = (pair_count as u32).div_ceil(block_size);
366
367        let config = CudaLaunchConfig {
368            grid_dim: (grid_size, 1, 1),
369            block_dim: (block_size, 1, 1),
370            shared_mem_bytes: 0,
371        };
372
373        // Launch kernel
374        unsafe {
375            func.launch(
376                config,
377                (
378                    &d_sources,
379                    &d_targets,
380                    &d_durations,
381                    &d_edge_freq,
382                    &d_edge_dur,
383                    max_activities as i32,
384                    pair_count as i32,
385                ),
386            )
387            .map_err(|e| format!("Kernel launch failed: {}", e))?;
388        }
389
390        // Synchronize - wait for kernel completion
391        device
392            .synchronize()
393            .map_err(|e| format!("Device synchronize failed: {}", e))?;
394
395        // Device-to-Host transfers
396        let mut result_frequencies = vec![0u32; edge_count];
397        let mut result_durations = vec![0u64; edge_count];
398
399        device
400            .dtoh_sync_copy_into(&d_edge_freq, &mut result_frequencies)
401            .map_err(|e| format!("DtoH frequencies failed: {}", e))?;
402        device
403            .dtoh_sync_copy_into(&d_edge_dur, &mut result_durations)
404            .map_err(|e| format!("DtoH durations failed: {}", e))?;
405
406        let elapsed = start.elapsed().as_micros() as u64;
407
408        // Build edge results from frequency matrix
409        let mut edges = Vec::new();
410        for src in 0..max_activities {
411            for tgt in 0..max_activities {
412                let idx = src * max_activities + tgt;
413                let freq = result_frequencies[idx];
414                if freq > 0 {
415                    let total_dur = result_durations[idx];
416                    let avg_dur = total_dur as f32 / freq as f32;
417
418                    edges.push(GpuDFGEdge {
419                        source_activity: src as u32,
420                        target_activity: tgt as u32,
421                        frequency: freq,
422                        avg_duration_ms: avg_dur,
423                        ..Default::default()
424                    });
425                }
426            }
427        }
428
429        // Record stats
430        let bytes_in = (pair_count * 3 * 4) as u64;
431        let bytes_out = (edge_count * 12) as u64;
432        let result = ExecutionResult::new(elapsed, pair_count as u64, true);
433        self.stats.record(&result, bytes_in, bytes_out);
434
435        log::debug!(
436            "DFG GPU kernel: {} pairs -> {} edges in {}us",
437            pair_count,
438            edges.len(),
439            elapsed
440        );
441
442        Ok((edges, result))
443    }
444
445    /// Execute pattern detection kernel on GPU.
446    #[cfg(feature = "cuda")]
447    pub fn execute_pattern_gpu(
448        &mut self,
449        nodes: &[crate::models::GpuDFGNode],
450        bottleneck_threshold: f32,
451        duration_threshold: f32,
452    ) -> Result<(Vec<crate::models::GpuPatternMatch>, ExecutionResult), String> {
453        use crate::models::{GpuPatternMatch, PatternSeverity, PatternType};
454
455        let device = self.device.as_ref().ok_or("No CUDA device available")?;
456        let start = std::time::Instant::now();
457
458        let n = nodes.len();
459        if n == 0 {
460            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
461        }
462
463        // Extract node data into Structure-of-Arrays format for GPU
464        let event_counts: Vec<u32> = nodes.iter().map(|n| n.event_count).collect();
465        let avg_durations: Vec<f32> = nodes.iter().map(|n| n.avg_duration_ms).collect();
466        let incoming_counts: Vec<u16> = nodes.iter().map(|n| n.incoming_count).collect();
467        let outgoing_counts: Vec<u16> = nodes.iter().map(|n| n.outgoing_count).collect();
468
469        // HtoD transfers
470        let d_event_counts = device
471            .htod_sync_copy(&event_counts)
472            .map_err(|e| format!("HtoD event_counts failed: {}", e))?;
473        let d_avg_durations = device
474            .htod_sync_copy(&avg_durations)
475            .map_err(|e| format!("HtoD avg_durations failed: {}", e))?;
476        let d_incoming = device
477            .htod_sync_copy(&incoming_counts)
478            .map_err(|e| format!("HtoD incoming failed: {}", e))?;
479        let d_outgoing = device
480            .htod_sync_copy(&outgoing_counts)
481            .map_err(|e| format!("HtoD outgoing failed: {}", e))?;
482
483        // Output buffers
484        let pattern_types = vec![0u8; n];
485        let pattern_confidences = vec![0.0f32; n];
486
487        let d_pattern_types = device
488            .htod_sync_copy(&pattern_types)
489            .map_err(|e| format!("HtoD pattern_types failed: {}", e))?;
490        let d_pattern_conf = device
491            .htod_sync_copy(&pattern_confidences)
492            .map_err(|e| format!("HtoD pattern_conf failed: {}", e))?;
493
494        // Get kernel function
495        let func = device
496            .get_func("pattern_detection", "pattern_detection_kernel")
497            .ok_or(
498                "Pattern kernel not loaded - call compile() with generate_pattern_kernel() first",
499            )?;
500
501        // Launch configuration
502        let block_size = 256u32;
503        let grid_size = (n as u32).div_ceil(block_size);
504
505        let config = CudaLaunchConfig {
506            grid_dim: (grid_size, 1, 1),
507            block_dim: (block_size, 1, 1),
508            shared_mem_bytes: 0,
509        };
510
511        // Launch kernel
512        unsafe {
513            func.launch(
514                config,
515                (
516                    &d_event_counts,
517                    &d_avg_durations,
518                    &d_incoming,
519                    &d_outgoing,
520                    &d_pattern_types,
521                    &d_pattern_conf,
522                    bottleneck_threshold,
523                    duration_threshold,
524                    n as i32,
525                ),
526            )
527            .map_err(|e| format!("Pattern kernel launch failed: {}", e))?;
528        }
529
530        device
531            .synchronize()
532            .map_err(|e| format!("Device synchronize failed: {}", e))?;
533
534        // DtoH transfers
535        let mut result_types = vec![0u8; n];
536        let mut result_confidences = vec![0.0f32; n];
537
538        device
539            .dtoh_sync_copy_into(&d_pattern_types, &mut result_types)
540            .map_err(|e| format!("DtoH pattern_types failed: {}", e))?;
541        device
542            .dtoh_sync_copy_into(&d_pattern_conf, &mut result_confidences)
543            .map_err(|e| format!("DtoH pattern_conf failed: {}", e))?;
544
545        let elapsed = start.elapsed().as_micros() as u64;
546
547        // Build pattern results
548        let mut patterns = Vec::new();
549        for (i, &ptype) in result_types.iter().enumerate() {
550            if ptype != 0 {
551                let pattern_type = match ptype {
552                    6 => PatternType::LongRunning,
553                    7 => PatternType::Bottleneck,
554                    _ => continue,
555                };
556
557                let severity = if ptype == 7 {
558                    PatternSeverity::Critical
559                } else {
560                    PatternSeverity::Warning
561                };
562
563                let mut pattern = GpuPatternMatch::new(pattern_type, severity);
564                pattern.add_activity(nodes[i].activity_id);
565                pattern.confidence = result_confidences[i];
566                pattern.frequency = nodes[i].event_count;
567                pattern.avg_duration_ms = nodes[i].avg_duration_ms;
568                patterns.push(pattern);
569            }
570        }
571
572        // Record stats
573        let bytes_in = (n * 12) as u64;
574        let bytes_out = (n * 5) as u64;
575        let result = ExecutionResult::new(elapsed, n as u64, true);
576        self.stats.record(&result, bytes_in, bytes_out);
577
578        log::debug!(
579            "Pattern GPU kernel: {} nodes -> {} patterns in {}us",
580            n,
581            patterns.len(),
582            elapsed
583        );
584
585        Ok((patterns, result))
586    }
587
588    /// Execute partial order derivation kernel on GPU.
589    #[cfg(feature = "cuda")]
590    pub fn execute_partial_order_gpu(
591        &mut self,
592        events: &[crate::models::GpuObjectEvent],
593    ) -> Result<(Vec<crate::models::GpuPartialOrderTrace>, ExecutionResult), String> {
594        use crate::models::{GpuPartialOrderTrace, HybridTimestamp};
595        use std::collections::HashMap;
596
597        let device = self.device.as_ref().ok_or("No CUDA device available")?;
598        let start = std::time::Instant::now();
599
600        // Group events by case
601        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
602        for event in events {
603            case_events.entry(event.object_id).or_default().push(event);
604        }
605
606        // For each case, we'll run a GPU kernel to compute the precedence matrix
607        let mut all_traces = Vec::with_capacity(case_events.len());
608        let mut total_kernel_time_us = 0u64;
609
610        for (case_id, case_evts) in case_events {
611            if case_evts.len() < 2 {
612                continue;
613            }
614
615            let mut sorted: Vec<_> = case_evts.into_iter().collect();
616            sorted.sort_by_key(|e| e.timestamp.physical_ms);
617
618            let n = sorted.len().min(16);
619
620            // Extract timing data
621            let start_times: Vec<u64> = sorted.iter().map(|e| e.timestamp.physical_ms).collect();
622            let end_times: Vec<u64> = sorted
623                .iter()
624                .map(|e| e.timestamp.physical_ms + e.duration_ms as u64)
625                .collect();
626
627            // Pad to 16 elements for consistent GPU buffer sizes
628            let mut start_times_padded = vec![0u64; 16];
629            let mut end_times_padded = vec![0u64; 16];
630            start_times_padded[..n].copy_from_slice(&start_times);
631            end_times_padded[..n].copy_from_slice(&end_times);
632
633            // HtoD transfers
634            let d_start_times = device
635                .htod_sync_copy(&start_times_padded)
636                .map_err(|e| format!("HtoD start_times failed: {}", e))?;
637            let d_end_times = device
638                .htod_sync_copy(&end_times_padded)
639                .map_err(|e| format!("HtoD end_times failed: {}", e))?;
640
641            // Output buffer (16x16 = 256 elements)
642            let precedence_flat = vec![0u32; 256];
643            let d_precedence = device
644                .htod_sync_copy(&precedence_flat)
645                .map_err(|e| format!("HtoD precedence failed: {}", e))?;
646
647            // Get kernel function
648            let func = device
649                .get_func("partial_order", "partial_order_kernel")
650                .ok_or("Partial order kernel not loaded")?;
651
652            // Launch configuration (16x16 grid for pairwise comparison)
653            let config = CudaLaunchConfig {
654                grid_dim: (1, 1, 1),
655                block_dim: (16, 16, 1),
656                shared_mem_bytes: 0,
657            };
658
659            let kernel_start = std::time::Instant::now();
660
661            // Launch kernel
662            unsafe {
663                func.launch(
664                    config,
665                    (&d_start_times, &d_end_times, &d_precedence, 16i32, 16i32),
666                )
667                .map_err(|e| format!("Partial order kernel launch failed: {}", e))?;
668            }
669
670            device
671                .synchronize()
672                .map_err(|e| format!("Device synchronize failed: {}", e))?;
673
674            total_kernel_time_us += kernel_start.elapsed().as_micros() as u64;
675
676            // DtoH transfer
677            let mut result_precedence = vec![0u32; 256];
678            device
679                .dtoh_sync_copy_into(&d_precedence, &mut result_precedence)
680                .map_err(|e| format!("DtoH precedence failed: {}", e))?;
681
682            // Convert flat precedence to 16x16 bit matrix
683            let mut precedence_matrix = [0u16; 16];
684            for i in 0..n {
685                for j in 0..n {
686                    if result_precedence[i * 16 + j] != 0 {
687                        precedence_matrix[i] |= 1u16 << j;
688                    }
689                }
690            }
691
692            // Build activity IDs array
693            let mut activity_ids = [u32::MAX; 16];
694            for (i, event) in sorted.iter().take(n).enumerate() {
695                activity_ids[i] = event.activity_id;
696            }
697
698            // Compute timing in seconds
699            let trace_start_ms = sorted.first().map(|e| e.timestamp.physical_ms).unwrap_or(0);
700            let mut activity_start_secs = [0u16; 16];
701            let mut activity_duration_secs = [0u16; 16];
702            for (i, event) in sorted.iter().take(n).enumerate() {
703                let rel_start = event.timestamp.physical_ms.saturating_sub(trace_start_ms);
704                activity_start_secs[i] = (rel_start / 1000).min(u16::MAX as u64) as u16;
705                activity_duration_secs[i] =
706                    (event.duration_ms / 1000).max(1).min(u16::MAX as u32) as u16;
707            }
708
709            // Build trace
710            let trace = GpuPartialOrderTrace {
711                trace_id: all_traces.len() as u64 + 1,
712                case_id,
713                event_count: sorted.len() as u32,
714                activity_count: n as u32,
715                start_time: sorted.first().map(|e| e.timestamp).unwrap_or_default(),
716                end_time: HybridTimestamp::new(
717                    sorted
718                        .last()
719                        .map(|e| e.timestamp.physical_ms + e.duration_ms as u64)
720                        .unwrap_or(0),
721                    0,
722                ),
723                max_width: Self::compute_width_from_matrix(&precedence_matrix, n),
724                flags: 0x01, // Has transitive closure flag
725                precedence_matrix,
726                activity_ids,
727                activity_start_secs,
728                activity_duration_secs,
729                _reserved: [0u8; 32],
730            };
731
732            all_traces.push(trace);
733        }
734
735        let elapsed = start.elapsed().as_micros() as u64;
736        let result = ExecutionResult::new(elapsed, events.len() as u64, true);
737
738        // Record stats
739        let bytes_in = (events.len() * 16) as u64;
740        let bytes_out = (all_traces.len() * 256) as u64;
741        self.stats.record(&result, bytes_in, bytes_out);
742
743        log::debug!(
744            "Partial order GPU kernel: {} events -> {} traces in {}us (kernel: {}us)",
745            events.len(),
746            all_traces.len(),
747            elapsed,
748            total_kernel_time_us
749        );
750
751        Ok((all_traces, result))
752    }
753
754    #[cfg(feature = "cuda")]
755    fn compute_width_from_matrix(precedence: &[u16; 16], n: usize) -> u32 {
756        let mut max_width = 1u32;
757
758        for i in 0..n {
759            let mut concurrent = 1u32;
760            for j in (i + 1)..n {
761                let i_precedes_j = (precedence[i] & (1u16 << j)) != 0;
762                let j_precedes_i = (precedence[j] & (1u16 << i)) != 0;
763                if !i_precedes_j && !j_precedes_i {
764                    concurrent += 1;
765                }
766            }
767            max_width = max_width.max(concurrent);
768        }
769
770        max_width
771    }
772
773    /// Execute conformance checking kernel on GPU.
774    #[cfg(feature = "cuda")]
775    pub fn execute_conformance_gpu(
776        &mut self,
777        events: &[crate::models::GpuObjectEvent],
778        model: &crate::models::ProcessModel,
779    ) -> Result<(Vec<crate::models::ConformanceResult>, ExecutionResult), String> {
780        use crate::models::{ComplianceLevel, ConformanceResult, ConformanceStatus};
781        use std::collections::HashMap;
782
783        let device = self.device.as_ref().ok_or("No CUDA device available")?;
784        let start = std::time::Instant::now();
785
786        // Group events by case
787        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
788        for event in events {
789            case_events.entry(event.object_id).or_default().push(event);
790        }
791
792        // Flatten traces for GPU processing
793        let mut all_activities: Vec<u32> = Vec::new();
794        let mut trace_starts: Vec<i32> = Vec::new();
795        let mut trace_lengths: Vec<i32> = Vec::new();
796        let mut trace_case_ids: Vec<u64> = Vec::new();
797
798        for (case_id, case_evts) in &case_events {
799            let mut sorted: Vec<_> = case_evts.iter().collect();
800            sorted.sort_by_key(|e| e.timestamp.physical_ms);
801
802            trace_starts.push(all_activities.len() as i32);
803            trace_lengths.push(sorted.len() as i32);
804            trace_case_ids.push(*case_id);
805
806            for event in sorted {
807                all_activities.push(event.activity_id);
808            }
809        }
810
811        let num_traces = trace_starts.len();
812        if num_traces == 0 {
813            return Ok((Vec::new(), ExecutionResult::new(0, 0, false)));
814        }
815
816        // Extract model transitions
817        let model_sources: Vec<u32> = model.transitions.iter().map(|(s, _)| *s).collect();
818        let model_targets: Vec<u32> = model.transitions.iter().map(|(_, t)| *t).collect();
819        let num_transitions = model_sources.len() as i32;
820
821        // HtoD transfers
822        let d_activities = device
823            .htod_sync_copy(&all_activities)
824            .map_err(|e| format!("HtoD activities failed: {}", e))?;
825        let d_trace_starts = device
826            .htod_sync_copy(&trace_starts)
827            .map_err(|e| format!("HtoD trace_starts failed: {}", e))?;
828        let d_trace_lengths = device
829            .htod_sync_copy(&trace_lengths)
830            .map_err(|e| format!("HtoD trace_lengths failed: {}", e))?;
831        let d_model_sources = device
832            .htod_sync_copy(&model_sources)
833            .map_err(|e| format!("HtoD model_sources failed: {}", e))?;
834        let d_model_targets = device
835            .htod_sync_copy(&model_targets)
836            .map_err(|e| format!("HtoD model_targets failed: {}", e))?;
837
838        // Output buffer
839        let fitness_scores = vec![0.0f32; num_traces];
840        let d_fitness = device
841            .htod_sync_copy(&fitness_scores)
842            .map_err(|e| format!("HtoD fitness failed: {}", e))?;
843
844        // Get kernel function
845        let func = device
846            .get_func("conformance", "conformance_kernel")
847            .ok_or("Conformance kernel not loaded")?;
848
849        // Launch configuration
850        let block_size = 256u32;
851        let grid_size = (num_traces as u32).div_ceil(block_size);
852
853        let config = CudaLaunchConfig {
854            grid_dim: (grid_size, 1, 1),
855            block_dim: (block_size, 1, 1),
856            shared_mem_bytes: 0,
857        };
858
859        // Launch kernel
860        unsafe {
861            func.launch(
862                config,
863                (
864                    &d_activities,
865                    &d_trace_starts,
866                    &d_trace_lengths,
867                    &d_model_sources,
868                    &d_model_targets,
869                    num_transitions,
870                    &d_fitness,
871                    num_traces as i32,
872                ),
873            )
874            .map_err(|e| format!("Conformance kernel launch failed: {}", e))?;
875        }
876
877        device
878            .synchronize()
879            .map_err(|e| format!("Device synchronize failed: {}", e))?;
880
881        // DtoH transfer
882        let mut result_fitness = vec![0.0f32; num_traces];
883        device
884            .dtoh_sync_copy_into(&d_fitness, &mut result_fitness)
885            .map_err(|e| format!("DtoH fitness failed: {}", e))?;
886
887        let elapsed = start.elapsed().as_micros() as u64;
888
889        // Build conformance results
890        let mut results = Vec::with_capacity(num_traces);
891        for (i, &fitness) in result_fitness.iter().enumerate() {
892            let status = if fitness >= 1.0 {
893                ConformanceStatus::Conformant
894            } else if fitness >= 0.8 {
895                ConformanceStatus::Deviation
896            } else {
897                ConformanceStatus::MissingActivity
898            };
899
900            results.push(ConformanceResult {
901                trace_id: trace_case_ids[i],
902                model_id: model.id,
903                status: status as u8,
904                compliance_level: ComplianceLevel::from_fitness(fitness) as u8,
905                fitness,
906                precision: fitness, // Simplified: same as fitness
907                generalization: 0.8,
908                simplicity: 1.0,
909                missing_count: ((1.0 - fitness) * trace_lengths[i] as f32) as u16,
910                extra_count: 0,
911                alignment_cost: ((1.0 - fitness) * trace_lengths[i] as f32) as u32,
912                alignment_length: trace_lengths[i] as u32,
913                _padding1: [0; 2],
914                _reserved: [0; 16],
915            });
916        }
917
918        // Record stats
919        let bytes_in = (all_activities.len() * 4 + model_sources.len() * 8) as u64;
920        let bytes_out = (num_traces * 4) as u64;
921        let result = ExecutionResult::new(elapsed, events.len() as u64, true);
922        self.stats.record(&result, bytes_in, bytes_out);
923
924        log::debug!(
925            "Conformance GPU kernel: {} traces in {}us, avg fitness: {:.2}",
926            num_traces,
927            elapsed,
928            result_fitness.iter().sum::<f32>() / num_traces as f32
929        );
930
931        Ok((results, result))
932    }
933}
934
935/// CPU fallback executor for when CUDA is not available.
936pub struct CpuFallbackExecutor;
937
938impl CpuFallbackExecutor {
939    /// Execute DFG construction on CPU.
940    pub fn execute_dfg_construction(
941        events: &[crate::models::GpuObjectEvent],
942        edges: &mut [crate::models::GpuDFGEdge],
943        max_activities: usize,
944    ) -> ExecutionResult {
945        let start = std::time::Instant::now();
946
947        let mut case_events: std::collections::HashMap<u64, Vec<&crate::models::GpuObjectEvent>> =
948            std::collections::HashMap::new();
949
950        for event in events {
951            case_events.entry(event.object_id).or_default().push(event);
952        }
953
954        for events in case_events.values() {
955            let mut sorted_events: Vec<_> = events.iter().collect();
956            sorted_events.sort_by_key(|e| e.timestamp.physical_ms);
957
958            for window in sorted_events.windows(2) {
959                let source = window[0].activity_id as usize;
960                let target = window[1].activity_id as usize;
961
962                if source < max_activities && target < max_activities {
963                    let edge_idx = source * max_activities + target;
964                    if edge_idx < edges.len() {
965                        edges[edge_idx].frequency += 1;
966                        edges[edge_idx].source_activity = source as u32;
967                        edges[edge_idx].target_activity = target as u32;
968                    }
969                }
970            }
971        }
972
973        let elapsed = start.elapsed().as_micros() as u64;
974        ExecutionResult::new(elapsed, events.len() as u64, false)
975    }
976
977    /// Execute pattern detection on CPU.
978    pub fn execute_pattern_detection(
979        nodes: &[crate::models::GpuDFGNode],
980        patterns: &mut Vec<crate::models::GpuPatternMatch>,
981        bottleneck_threshold: f32,
982        duration_threshold: f32,
983    ) -> ExecutionResult {
984        use crate::models::{GpuPatternMatch, PatternSeverity, PatternType};
985
986        let start = std::time::Instant::now();
987
988        for node in nodes {
989            if node.event_count == 0 {
990                continue;
991            }
992
993            let incoming_count = node.incoming_count as f32;
994            let outgoing_count = node.outgoing_count as f32;
995            if incoming_count > bottleneck_threshold && outgoing_count < incoming_count * 0.5 {
996                let mut pattern =
997                    GpuPatternMatch::new(PatternType::Bottleneck, PatternSeverity::Critical);
998                pattern.add_activity(node.activity_id);
999                pattern.confidence = incoming_count / bottleneck_threshold;
1000                pattern.frequency = node.event_count;
1001                pattern.avg_duration_ms = node.avg_duration_ms;
1002                patterns.push(pattern);
1003            }
1004
1005            if node.avg_duration_ms > duration_threshold {
1006                let mut pattern =
1007                    GpuPatternMatch::new(PatternType::LongRunning, PatternSeverity::Warning);
1008                pattern.add_activity(node.activity_id);
1009                pattern.confidence = node.avg_duration_ms / duration_threshold;
1010                pattern.frequency = node.event_count;
1011                pattern.avg_duration_ms = node.avg_duration_ms;
1012                patterns.push(pattern);
1013            }
1014        }
1015
1016        let elapsed = start.elapsed().as_micros() as u64;
1017        ExecutionResult::new(elapsed, nodes.len() as u64, false)
1018    }
1019
1020    /// Execute partial order derivation on CPU.
1021    pub fn execute_partial_order(
1022        events: &[crate::models::GpuObjectEvent],
1023        traces: &mut Vec<crate::models::GpuPartialOrderTrace>,
1024    ) -> ExecutionResult {
1025        use crate::models::{GpuPartialOrderTrace, HybridTimestamp};
1026        use std::collections::HashMap;
1027
1028        let start = std::time::Instant::now();
1029
1030        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
1031        for event in events {
1032            case_events.entry(event.object_id).or_default().push(event);
1033        }
1034
1035        for (case_id, case_evts) in &case_events {
1036            if case_evts.len() < 2 {
1037                continue;
1038            }
1039
1040            let mut sorted: Vec<_> = case_evts.iter().collect();
1041            sorted.sort_by_key(|e| e.timestamp.physical_ms);
1042
1043            let n = sorted.len().min(16);
1044            let mut precedence = [0u16; 16];
1045            let mut activity_ids = [0u32; 16];
1046            let mut activity_start_secs = [0u16; 16];
1047            let mut activity_duration_secs = [0u16; 16];
1048
1049            // Get trace start time for relative timing
1050            let trace_start_ms = sorted.first().map(|e| e.timestamp.physical_ms).unwrap_or(0);
1051
1052            for i in 0..n {
1053                activity_ids[i] = sorted[i].activity_id;
1054
1055                // Store timing info in SECONDS (relative to trace start)
1056                let rel_start_ms = sorted[i]
1057                    .timestamp
1058                    .physical_ms
1059                    .saturating_sub(trace_start_ms);
1060                activity_start_secs[i] = (rel_start_ms / 1000).min(u16::MAX as u64) as u16;
1061                activity_duration_secs[i] =
1062                    (sorted[i].duration_ms / 1000).max(1).min(u16::MAX as u32) as u16;
1063
1064                for j in (i + 1)..n {
1065                    let i_end = sorted[i].timestamp.physical_ms + sorted[i].duration_ms as u64;
1066                    let j_start = sorted[j].timestamp.physical_ms;
1067                    if i_end <= j_start {
1068                        precedence[i] |= 1u16 << j;
1069                    }
1070                }
1071            }
1072
1073            // Calculate trace end time (last event end)
1074            let trace_end_ms = sorted
1075                .iter()
1076                .take(n)
1077                .map(|e| e.timestamp.physical_ms + e.duration_ms as u64)
1078                .max()
1079                .unwrap_or(0);
1080
1081            let trace = GpuPartialOrderTrace {
1082                trace_id: traces.len() as u64 + 1,
1083                case_id: *case_id,
1084                event_count: sorted.len() as u32,
1085                activity_count: n as u32,
1086                start_time: sorted.first().map(|e| e.timestamp).unwrap_or_default(),
1087                end_time: HybridTimestamp::new(trace_end_ms, 0),
1088                max_width: Self::compute_width(&precedence, n),
1089                flags: 0,
1090                precedence_matrix: precedence,
1091                activity_ids,
1092                activity_start_secs,
1093                activity_duration_secs,
1094                _reserved: [0u8; 32],
1095            };
1096
1097            traces.push(trace);
1098        }
1099
1100        let elapsed = start.elapsed().as_micros() as u64;
1101        ExecutionResult::new(elapsed, events.len() as u64, false)
1102    }
1103
1104    fn compute_width(precedence: &[u16; 16], n: usize) -> u32 {
1105        let mut max_width = 1u32;
1106
1107        for i in 0..n {
1108            let mut concurrent = 1u32;
1109            for j in (i + 1)..n {
1110                let i_precedes_j = (precedence[i] & (1u16 << j)) != 0;
1111                let j_precedes_i = (precedence[j] & (1u16 << i)) != 0;
1112                if !i_precedes_j && !j_precedes_i {
1113                    concurrent += 1;
1114                }
1115            }
1116            max_width = max_width.max(concurrent);
1117        }
1118
1119        max_width
1120    }
1121
1122    /// Execute conformance checking on CPU.
1123    pub fn execute_conformance(
1124        events: &[crate::models::GpuObjectEvent],
1125        model: &crate::models::ProcessModel,
1126    ) -> (Vec<crate::models::ConformanceResult>, ExecutionResult) {
1127        use crate::models::{ComplianceLevel, ConformanceResult, ConformanceStatus};
1128        use std::collections::HashMap;
1129
1130        let start = std::time::Instant::now();
1131        let mut results = Vec::new();
1132
1133        let mut case_events: HashMap<u64, Vec<&crate::models::GpuObjectEvent>> = HashMap::new();
1134        for event in events {
1135            case_events.entry(event.object_id).or_default().push(event);
1136        }
1137
1138        for (case_id, case_evts) in &case_events {
1139            if case_evts.is_empty() {
1140                continue;
1141            }
1142
1143            let mut sorted: Vec<_> = case_evts.iter().collect();
1144            sorted.sort_by_key(|e| e.timestamp.physical_ms);
1145
1146            let mut valid_moves = 0u32;
1147            let total_moves = (sorted.len() - 1) as u32;
1148
1149            for window in sorted.windows(2) {
1150                let source = window[0].activity_id;
1151                let target = window[1].activity_id;
1152
1153                let is_valid = model
1154                    .transitions
1155                    .iter()
1156                    .any(|(s, t)| *s == source && *t == target);
1157
1158                if is_valid {
1159                    valid_moves += 1;
1160                }
1161            }
1162
1163            let fitness = if total_moves > 0 {
1164                valid_moves as f32 / total_moves as f32
1165            } else {
1166                1.0
1167            };
1168
1169            let compliance = ComplianceLevel::from_fitness(fitness);
1170
1171            let status = if fitness >= 0.95 {
1172                ConformanceStatus::Conformant
1173            } else if total_moves > valid_moves {
1174                ConformanceStatus::ExtraActivity
1175            } else {
1176                ConformanceStatus::WrongSequence
1177            };
1178
1179            let extra = total_moves.saturating_sub(valid_moves) as u16;
1180
1181            results.push(ConformanceResult {
1182                trace_id: *case_id,
1183                model_id: model.id,
1184                status: status as u8,
1185                compliance_level: compliance as u8,
1186                _padding1: [0; 2],
1187                fitness,
1188                precision: 1.0 - extra as f32 / (total_moves + 1) as f32,
1189                generalization: 0.8,
1190                simplicity: 1.0,
1191                missing_count: 0,
1192                extra_count: extra,
1193                alignment_cost: extra as u32,
1194                alignment_length: sorted.len() as u32,
1195                _reserved: [0u8; 16],
1196            });
1197        }
1198
1199        let elapsed = start.elapsed().as_micros() as u64;
1200        let exec_result = ExecutionResult::new(elapsed, events.len() as u64, false);
1201
1202        (results, exec_result)
1203    }
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208    use super::*;
1209    use crate::cuda::codegen::generate_dfg_kernel;
1210
1211    #[test]
1212    fn test_executor_creation() {
1213        let executor = KernelExecutor::new();
1214        assert_eq!(executor.kernel_count(), 0);
1215    }
1216
1217    #[test]
1218    fn test_kernel_compilation() {
1219        let mut executor = KernelExecutor::new();
1220        let gpu_status = executor.gpu_status();
1221        let source = generate_dfg_kernel();
1222        let result = executor.compile(&source);
1223
1224        // Convert result to bool to release borrow
1225        let is_ok = result.is_ok();
1226        let err_msg = if let Err(ref e) = result {
1227            Some(e.clone())
1228        } else {
1229            None
1230        };
1231
1232        // Print error for debugging if it fails
1233        if let Some(e) = err_msg {
1234            eprintln!("Kernel compilation error: {}", e);
1235        }
1236
1237        // Skip GPU compilation tests when no CUDA available
1238        if gpu_status == GpuStatus::CpuFallback {
1239            assert!(is_ok); // CPU fallback should still cache the kernel
1240            assert_eq!(executor.kernel_count(), 1);
1241        } else {
1242            // With CUDA, might fail due to NVRTC issues in test env
1243            // Either way, kernel should be in cache (compiled or pending)
1244            assert!(is_ok || gpu_status == GpuStatus::CudaError);
1245        }
1246    }
1247
1248    #[test]
1249    fn test_execution_result() {
1250        let result = ExecutionResult::new(1000, 1_000_000, true);
1251        assert_eq!(result.throughput, 1_000_000_000.0);
1252        assert!(result.used_gpu);
1253    }
1254
1255    #[test]
1256    fn test_gpu_status() {
1257        let executor = KernelExecutor::new();
1258        let status = executor.gpu_status();
1259        assert!(matches!(
1260            status,
1261            GpuStatus::CpuFallback
1262                | GpuStatus::CudaReady
1263                | GpuStatus::CudaError
1264                | GpuStatus::CudaNotCompiled
1265        ));
1266    }
1267
1268    #[test]
1269    fn test_gpu_stats() {
1270        let mut stats = GpuStats::default();
1271        let result = ExecutionResult::new(1000, 10000, true);
1272        stats.record(&result, 4000, 2000);
1273
1274        assert_eq!(stats.kernel_launches, 1);
1275        assert_eq!(stats.total_gpu_time_us, 1000);
1276        assert_eq!(stats.bytes_to_gpu, 4000);
1277        assert_eq!(stats.bytes_from_gpu, 2000);
1278    }
1279}