ringkernel_procint/kernels/
dfg_construction.rs1use crate::cuda::{
6 generate_dfg_kernel, CpuFallbackExecutor, ExecutionResult, GpuStats, GpuStatus, KernelExecutor,
7};
8use crate::models::{DFGGraph, GpuDFGEdge, GpuDFGNode, GpuObjectEvent};
9
10pub struct DfgConstructionKernel {
12 max_activities: usize,
14 executor: KernelExecutor,
16 use_gpu: bool,
18 kernel_compiled: bool,
20}
21
22impl Default for DfgConstructionKernel {
23 fn default() -> Self {
24 Self::new(64)
25 }
26}
27
28impl DfgConstructionKernel {
29 pub fn new(max_activities: usize) -> Self {
31 let mut kernel = Self {
32 max_activities,
33 executor: KernelExecutor::new(),
34 use_gpu: true,
35 kernel_compiled: false,
36 };
37
38 kernel.try_compile_kernel();
40 kernel
41 }
42
43 fn try_compile_kernel(&mut self) {
45 if self.executor.is_cuda_available() && !self.kernel_compiled {
46 let source = generate_dfg_kernel();
47 match self.executor.compile(&source) {
48 Ok(_) => {
49 log::info!("DFG CUDA kernel compiled successfully");
50 self.kernel_compiled = true;
51 }
52 Err(e) => {
53 log::warn!("DFG CUDA kernel compilation failed: {}", e);
54 self.kernel_compiled = false;
55 }
56 }
57 }
58 }
59
60 pub fn with_cpu_only(mut self) -> Self {
62 self.use_gpu = false;
63 self
64 }
65
66 pub fn gpu_status(&self) -> GpuStatus {
68 self.executor.gpu_status()
69 }
70
71 pub fn gpu_stats(&self) -> &GpuStats {
73 &self.executor.stats
74 }
75
76 pub fn is_using_gpu(&self) -> bool {
78 self.use_gpu && self.kernel_compiled && self.executor.is_cuda_available()
79 }
80
81 pub fn process(&mut self, events: &[GpuObjectEvent]) -> DfgResult {
83 let start = std::time::Instant::now();
84
85 #[cfg(feature = "cuda")]
87 let (gpu_edges, exec_result) = if self.is_using_gpu() {
88 match self.executor.execute_dfg_gpu(events) {
89 Ok((edges, result)) => {
90 log::debug!(
91 "DFG GPU execution: {} events -> {} edges in {}µs",
92 events.len(),
93 edges.len(),
94 result.execution_time_us
95 );
96 (Some(edges), result)
97 }
98 Err(e) => {
99 log::warn!("DFG GPU execution failed, falling back to CPU: {}", e);
100 (None, ExecutionResult::default())
101 }
102 }
103 } else {
104 (None, ExecutionResult::default())
105 };
106
107 #[cfg(not(feature = "cuda"))]
108 let gpu_edges: Option<Vec<GpuDFGEdge>> = None;
109 #[cfg(not(feature = "cuda"))]
110 let exec_result = ExecutionResult::default();
111
112 let (edges, exec_result) = if let Some(gpu_edges) = gpu_edges {
114 (gpu_edges, exec_result)
115 } else {
116 let edge_count = self.max_activities * self.max_activities;
118 let mut edges = vec![GpuDFGEdge::default(); edge_count];
119 let result = CpuFallbackExecutor::execute_dfg_construction(
120 events,
121 &mut edges,
122 self.max_activities,
123 );
124 let active_edges: Vec<GpuDFGEdge> =
125 edges.into_iter().filter(|e| e.frequency > 0).collect();
126 (active_edges, result)
127 };
128
129 let mut nodes = vec![GpuDFGNode::default(); self.max_activities];
131 let mut activity_events: std::collections::HashMap<u32, Vec<&GpuObjectEvent>> =
132 std::collections::HashMap::new();
133
134 for event in events {
135 activity_events
136 .entry(event.activity_id)
137 .or_default()
138 .push(event);
139 }
140
141 for (activity_id, evts) in &activity_events {
142 let idx = *activity_id as usize;
143 if idx < nodes.len() {
144 nodes[idx].activity_id = *activity_id;
145 nodes[idx].event_count = evts.len() as u32;
146
147 let durations: Vec<u32> = evts.iter().map(|e| e.duration_ms).collect();
149 if !durations.is_empty() {
150 nodes[idx].min_duration_ms = *durations.iter().min().unwrap();
151 nodes[idx].max_duration_ms = *durations.iter().max().unwrap();
152 nodes[idx].avg_duration_ms =
153 durations.iter().sum::<u32>() as f32 / durations.len() as f32;
154 }
155 }
156 }
157
158 for edge in &edges {
160 let src = edge.source_activity as usize;
161 let tgt = edge.target_activity as usize;
162 if src < nodes.len() {
163 nodes[src].outgoing_count = nodes[src]
164 .outgoing_count
165 .saturating_add(edge.frequency.min(u16::MAX as u32) as u16);
166 }
167 if tgt < nodes.len() {
168 nodes[tgt].incoming_count = nodes[tgt]
169 .incoming_count
170 .saturating_add(edge.frequency.min(u16::MAX as u32) as u16);
171 }
172 }
173
174 let dfg = DFGGraph::from_gpu(nodes, edges);
176
177 let total_time = start.elapsed().as_micros() as u64;
178
179 DfgResult {
180 dfg,
181 execution_result: exec_result,
182 total_time_us: total_time,
183 }
184 }
185}
186
187#[derive(Debug)]
189pub struct DfgResult {
190 pub dfg: DFGGraph,
192 pub execution_result: ExecutionResult,
194 pub total_time_us: u64,
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::models::HybridTimestamp;
202
203 fn create_test_events() -> Vec<GpuObjectEvent> {
204 let mut events = Vec::new();
205
206 for (i, activity_id) in [1u32, 2, 3].iter().enumerate() {
208 events.push(GpuObjectEvent {
209 event_id: i as u64,
210 object_id: 100, activity_id: *activity_id,
212 timestamp: HybridTimestamp::new(i as u64 * 1000, 0),
213 duration_ms: 1000,
214 ..Default::default()
215 });
216 }
217
218 for (i, activity_id) in [1u32, 2, 3].iter().enumerate() {
220 events.push(GpuObjectEvent {
221 event_id: (i + 3) as u64,
222 object_id: 101, activity_id: *activity_id,
224 timestamp: HybridTimestamp::new(i as u64 * 1000, 0),
225 duration_ms: 1000,
226 ..Default::default()
227 });
228 }
229
230 events
231 }
232
233 #[test]
234 fn test_dfg_construction() {
235 let mut kernel = DfgConstructionKernel::new(10).with_cpu_only();
236 let events = create_test_events();
237 let result = kernel.process(&events);
238
239 assert!(result.dfg.edge_count() > 0);
241 }
242}