ringkernel_procint/kernels/
dfg_construction.rs

1//! DFG construction kernel.
2//!
3//! Builds Directly-Follows Graph from event streams using GPU acceleration.
4
5use crate::cuda::{
6    generate_dfg_kernel, CpuFallbackExecutor, ExecutionResult, GpuStats, GpuStatus, KernelExecutor,
7};
8use crate::models::{DFGGraph, GpuDFGEdge, GpuDFGNode, GpuObjectEvent};
9
10/// DFG construction kernel.
11pub struct DfgConstructionKernel {
12    /// Maximum number of activities.
13    max_activities: usize,
14    /// Kernel executor.
15    executor: KernelExecutor,
16    /// Use GPU if available.
17    use_gpu: bool,
18    /// Whether kernel is compiled.
19    kernel_compiled: bool,
20}
21
22impl Default for DfgConstructionKernel {
23    fn default() -> Self {
24        Self::new(64)
25    }
26}
27
28impl DfgConstructionKernel {
29    /// Create a new DFG construction kernel.
30    pub fn new(max_activities: usize) -> Self {
31        let mut kernel = Self {
32            max_activities,
33            executor: KernelExecutor::new(),
34            use_gpu: true,
35            kernel_compiled: false,
36        };
37
38        // Try to compile the CUDA kernel at creation time
39        kernel.try_compile_kernel();
40        kernel
41    }
42
43    /// Try to compile the CUDA kernel.
44    fn try_compile_kernel(&mut self) {
45        if self.executor.is_cuda_available() && !self.kernel_compiled {
46            let source = generate_dfg_kernel();
47            match self.executor.compile(&source) {
48                Ok(_) => {
49                    log::info!("DFG CUDA kernel compiled successfully");
50                    self.kernel_compiled = true;
51                }
52                Err(e) => {
53                    log::warn!("DFG CUDA kernel compilation failed: {}", e);
54                    self.kernel_compiled = false;
55                }
56            }
57        }
58    }
59
60    /// Disable GPU (use CPU fallback).
61    pub fn with_cpu_only(mut self) -> Self {
62        self.use_gpu = false;
63        self
64    }
65
66    /// Get GPU status.
67    pub fn gpu_status(&self) -> GpuStatus {
68        self.executor.gpu_status()
69    }
70
71    /// Get GPU stats.
72    pub fn gpu_stats(&self) -> &GpuStats {
73        &self.executor.stats
74    }
75
76    /// Check if GPU is being used.
77    pub fn is_using_gpu(&self) -> bool {
78        self.use_gpu && self.kernel_compiled && self.executor.is_cuda_available()
79    }
80
81    /// Process events and build DFG.
82    pub fn process(&mut self, events: &[GpuObjectEvent]) -> DfgResult {
83        let start = std::time::Instant::now();
84
85        // Try GPU path first if available and compiled
86        #[cfg(feature = "cuda")]
87        let (gpu_edges, exec_result) = if self.is_using_gpu() {
88            match self.executor.execute_dfg_gpu(events) {
89                Ok((edges, result)) => {
90                    log::debug!(
91                        "DFG GPU execution: {} events -> {} edges in {}µs",
92                        events.len(),
93                        edges.len(),
94                        result.execution_time_us
95                    );
96                    (Some(edges), result)
97                }
98                Err(e) => {
99                    log::warn!("DFG GPU execution failed, falling back to CPU: {}", e);
100                    (None, ExecutionResult::default())
101                }
102            }
103        } else {
104            (None, ExecutionResult::default())
105        };
106
107        #[cfg(not(feature = "cuda"))]
108        let gpu_edges: Option<Vec<GpuDFGEdge>> = None;
109        #[cfg(not(feature = "cuda"))]
110        let exec_result = ExecutionResult::default();
111
112        // Use GPU results or fall back to CPU
113        let (edges, exec_result) = if let Some(gpu_edges) = gpu_edges {
114            (gpu_edges, exec_result)
115        } else {
116            // CPU fallback path
117            let edge_count = self.max_activities * self.max_activities;
118            let mut edges = vec![GpuDFGEdge::default(); edge_count];
119            let result = CpuFallbackExecutor::execute_dfg_construction(
120                events,
121                &mut edges,
122                self.max_activities,
123            );
124            let active_edges: Vec<GpuDFGEdge> =
125                edges.into_iter().filter(|e| e.frequency > 0).collect();
126            (active_edges, result)
127        };
128
129        // Build node statistics
130        let mut nodes = vec![GpuDFGNode::default(); self.max_activities];
131        let mut activity_events: std::collections::HashMap<u32, Vec<&GpuObjectEvent>> =
132            std::collections::HashMap::new();
133
134        for event in events {
135            activity_events
136                .entry(event.activity_id)
137                .or_default()
138                .push(event);
139        }
140
141        for (activity_id, evts) in &activity_events {
142            let idx = *activity_id as usize;
143            if idx < nodes.len() {
144                nodes[idx].activity_id = *activity_id;
145                nodes[idx].event_count = evts.len() as u32;
146
147                // Calculate duration stats
148                let durations: Vec<u32> = evts.iter().map(|e| e.duration_ms).collect();
149                if !durations.is_empty() {
150                    nodes[idx].min_duration_ms = *durations.iter().min().unwrap();
151                    nodes[idx].max_duration_ms = *durations.iter().max().unwrap();
152                    nodes[idx].avg_duration_ms =
153                        durations.iter().sum::<u32>() as f32 / durations.len() as f32;
154                }
155            }
156        }
157
158        // Calculate incoming/outgoing counts from edge list
159        for edge in &edges {
160            let src = edge.source_activity as usize;
161            let tgt = edge.target_activity as usize;
162            if src < nodes.len() {
163                nodes[src].outgoing_count = nodes[src]
164                    .outgoing_count
165                    .saturating_add(edge.frequency.min(u16::MAX as u32) as u16);
166            }
167            if tgt < nodes.len() {
168                nodes[tgt].incoming_count = nodes[tgt]
169                    .incoming_count
170                    .saturating_add(edge.frequency.min(u16::MAX as u32) as u16);
171            }
172        }
173
174        // Build DFG
175        let dfg = DFGGraph::from_gpu(nodes, edges);
176
177        let total_time = start.elapsed().as_micros() as u64;
178
179        DfgResult {
180            dfg,
181            execution_result: exec_result,
182            total_time_us: total_time,
183        }
184    }
185}
186
187/// Result of DFG construction.
188#[derive(Debug)]
189pub struct DfgResult {
190    /// Constructed DFG.
191    pub dfg: DFGGraph,
192    /// Kernel execution result.
193    pub execution_result: ExecutionResult,
194    /// Total processing time in microseconds.
195    pub total_time_us: u64,
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::models::HybridTimestamp;
202
203    fn create_test_events() -> Vec<GpuObjectEvent> {
204        let mut events = Vec::new();
205
206        // Case 1: A -> B -> C
207        for (i, activity_id) in [1u32, 2, 3].iter().enumerate() {
208            events.push(GpuObjectEvent {
209                event_id: i as u64,
210                object_id: 100, // case ID
211                activity_id: *activity_id,
212                timestamp: HybridTimestamp::new(i as u64 * 1000, 0),
213                duration_ms: 1000,
214                ..Default::default()
215            });
216        }
217
218        // Case 2: A -> B -> C
219        for (i, activity_id) in [1u32, 2, 3].iter().enumerate() {
220            events.push(GpuObjectEvent {
221                event_id: (i + 3) as u64,
222                object_id: 101, // case ID
223                activity_id: *activity_id,
224                timestamp: HybridTimestamp::new(i as u64 * 1000, 0),
225                duration_ms: 1000,
226                ..Default::default()
227            });
228        }
229
230        events
231    }
232
233    #[test]
234    fn test_dfg_construction() {
235        let mut kernel = DfgConstructionKernel::new(10).with_cpu_only();
236        let events = create_test_events();
237        let result = kernel.process(&events);
238
239        // Should have edges A->B and B->C
240        assert!(result.dfg.edge_count() > 0);
241    }
242}