1use crate::types::{DFGEdge, DFGResult, DirectlyFollowsGraph, EventLog, Trace};
9use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
10use std::collections::HashMap;
11
12#[derive(Debug, Clone)]
20pub struct DFGConstruction {
21 metadata: KernelMetadata,
22}
23
24impl Default for DFGConstruction {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl DFGConstruction {
31 #[must_use]
33 pub fn new() -> Self {
34 Self {
35 metadata: KernelMetadata::batch(
36 "procint/dfg-construction",
37 Domain::ProcessIntelligence,
38 )
39 .with_description("Directly-follows graph construction")
40 .with_throughput(100_000)
41 .with_latency_us(50.0),
42 }
43 }
44
45 pub fn compute(log: &EventLog) -> DFGResult {
47 let mut dfg = DirectlyFollowsGraph::new();
48 let mut edge_map: HashMap<(String, String), (u64, Vec<u64>)> = HashMap::new();
49
50 let mut event_count = 0u64;
51
52 for trace in log.traces.values() {
53 let mut events: Vec<_> = trace.events.iter().collect();
55 events.sort_by_key(|e| e.timestamp);
56
57 event_count += events.len() as u64;
58
59 if let Some(first) = events.first() {
61 *dfg.start_activities
62 .entry(first.activity.clone())
63 .or_insert(0) += 1;
64 }
65 if let Some(last) = events.last() {
66 *dfg.end_activities.entry(last.activity.clone()).or_insert(0) += 1;
67 }
68
69 for event in &events {
71 *dfg.activity_counts
72 .entry(event.activity.clone())
73 .or_insert(0) += 1;
74 }
75
76 for window in events.windows(2) {
78 let source = &window[0].activity;
79 let target = &window[1].activity;
80 let duration = window[1].timestamp.saturating_sub(window[0].timestamp);
81
82 let key = (source.clone(), target.clone());
83 let entry = edge_map.entry(key).or_insert((0, Vec::new()));
84 entry.0 += 1;
85 entry.1.push(duration);
86 }
87 }
88
89 dfg.activities = dfg.activity_counts.keys().cloned().collect();
91 dfg.activities.sort();
92
93 let unique_pairs = edge_map.len() as u64;
95
96 for ((source, target), (count, durations)) in edge_map {
98 let avg_duration = if durations.is_empty() {
99 0.0
100 } else {
101 durations.iter().sum::<u64>() as f64 / durations.len() as f64
102 };
103
104 dfg.edges.push(DFGEdge {
105 source,
106 target,
107 count,
108 avg_duration_ms: avg_duration,
109 });
110 }
111
112 dfg.edges.sort_by(|a, b| b.count.cmp(&a.count));
114
115 DFGResult {
116 dfg,
117 trace_count: log.trace_count() as u64,
118 event_count,
119 unique_pairs,
120 }
121 }
122
123 pub fn compute_trace(trace: &Trace) -> DirectlyFollowsGraph {
125 let mut dfg = DirectlyFollowsGraph::new();
126 let mut edge_map: HashMap<(String, String), (u64, Vec<u64>)> = HashMap::new();
127
128 let mut events: Vec<_> = trace.events.iter().collect();
129 events.sort_by_key(|e| e.timestamp);
130
131 if let Some(first) = events.first() {
133 dfg.start_activities.insert(first.activity.clone(), 1);
134 }
135 if let Some(last) = events.last() {
136 dfg.end_activities.insert(last.activity.clone(), 1);
137 }
138
139 for event in &events {
141 *dfg.activity_counts
142 .entry(event.activity.clone())
143 .or_insert(0) += 1;
144 }
145
146 for window in events.windows(2) {
148 let source = &window[0].activity;
149 let target = &window[1].activity;
150 let duration = window[1].timestamp.saturating_sub(window[0].timestamp);
151
152 let key = (source.clone(), target.clone());
153 let entry = edge_map.entry(key).or_insert((0, Vec::new()));
154 entry.0 += 1;
155 entry.1.push(duration);
156 }
157
158 dfg.activities = dfg.activity_counts.keys().cloned().collect();
160 dfg.activities.sort();
161
162 for ((source, target), (count, durations)) in edge_map {
163 let avg_duration = if durations.is_empty() {
164 0.0
165 } else {
166 durations.iter().sum::<u64>() as f64 / durations.len() as f64
167 };
168
169 dfg.edges.push(DFGEdge {
170 source,
171 target,
172 count,
173 avg_duration_ms: avg_duration,
174 });
175 }
176
177 dfg
178 }
179
180 pub fn filter_by_frequency(dfg: &DirectlyFollowsGraph, min_count: u64) -> DirectlyFollowsGraph {
182 let mut filtered = DirectlyFollowsGraph::new();
183
184 let mut active_activities = std::collections::HashSet::new();
186
187 filtered.edges = dfg
188 .edges
189 .iter()
190 .filter(|e| e.count >= min_count)
191 .map(|e| {
192 active_activities.insert(e.source.clone());
193 active_activities.insert(e.target.clone());
194 e.clone()
195 })
196 .collect();
197
198 filtered.activities = active_activities.into_iter().collect();
199 filtered.activities.sort();
200
201 filtered.activity_counts = dfg
203 .activity_counts
204 .iter()
205 .filter(|(k, _)| filtered.activities.contains(*k))
206 .map(|(k, v)| (k.clone(), *v))
207 .collect();
208
209 filtered.start_activities = dfg
211 .start_activities
212 .iter()
213 .filter(|(k, _)| filtered.activities.contains(*k))
214 .map(|(k, v)| (k.clone(), *v))
215 .collect();
216
217 filtered.end_activities = dfg
218 .end_activities
219 .iter()
220 .filter(|(k, _)| filtered.activities.contains(*k))
221 .map(|(k, v)| (k.clone(), *v))
222 .collect();
223
224 filtered
225 }
226
227 pub fn calculate_metrics(dfg: &DirectlyFollowsGraph) -> DFGMetrics {
229 let node_count = dfg.activities.len();
230 let edge_count = dfg.edges.len();
231
232 let max_possible_edges = node_count * node_count;
233 let density = if max_possible_edges > 0 {
234 edge_count as f64 / max_possible_edges as f64
235 } else {
236 0.0
237 };
238
239 let total_edge_weight: u64 = dfg.edges.iter().map(|e| e.count).sum();
240 let avg_edge_weight = if edge_count > 0 {
241 total_edge_weight as f64 / edge_count as f64
242 } else {
243 0.0
244 };
245
246 DFGMetrics {
247 node_count,
248 edge_count,
249 density,
250 avg_edge_weight,
251 start_activity_count: dfg.start_activities.len(),
252 end_activity_count: dfg.end_activities.len(),
253 }
254 }
255}
256
257impl GpuKernel for DFGConstruction {
258 fn metadata(&self) -> &KernelMetadata {
259 &self.metadata
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct DFGMetrics {
266 pub node_count: usize,
268 pub edge_count: usize,
270 pub density: f64,
272 pub avg_edge_weight: f64,
274 pub start_activity_count: usize,
276 pub end_activity_count: usize,
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::types::ProcessEvent;
284
285 fn create_test_log() -> EventLog {
286 let mut log = EventLog::new("test_log".to_string());
287
288 for (i, activity) in ["A", "B", "C", "D"].iter().enumerate() {
290 log.add_event(ProcessEvent {
291 id: i as u64,
292 case_id: "case1".to_string(),
293 activity: activity.to_string(),
294 timestamp: (i as u64 + 1) * 1000,
295 resource: None,
296 attributes: HashMap::new(),
297 });
298 }
299
300 for (i, activity) in ["A", "B", "C", "D"].iter().enumerate() {
302 log.add_event(ProcessEvent {
303 id: (i + 10) as u64,
304 case_id: "case2".to_string(),
305 activity: activity.to_string(),
306 timestamp: (i as u64 + 1) * 1000,
307 resource: None,
308 attributes: HashMap::new(),
309 });
310 }
311
312 for (i, activity) in ["A", "B", "E", "D"].iter().enumerate() {
314 log.add_event(ProcessEvent {
315 id: (i + 20) as u64,
316 case_id: "case3".to_string(),
317 activity: activity.to_string(),
318 timestamp: (i as u64 + 1) * 1000,
319 resource: None,
320 attributes: HashMap::new(),
321 });
322 }
323
324 log
325 }
326
327 #[test]
328 fn test_dfg_construction_metadata() {
329 let kernel = DFGConstruction::new();
330 assert_eq!(kernel.metadata().id, "procint/dfg-construction");
331 assert_eq!(kernel.metadata().domain, Domain::ProcessIntelligence);
332 }
333
334 #[test]
335 fn test_dfg_construction() {
336 let log = create_test_log();
337 let result = DFGConstruction::compute(&log);
338
339 assert_eq!(result.trace_count, 3);
340 assert_eq!(result.event_count, 12);
341
342 assert_eq!(result.dfg.activities.len(), 5);
344 }
345
346 #[test]
347 fn test_dfg_edges() {
348 let log = create_test_log();
349 let result = DFGConstruction::compute(&log);
350
351 let ab_edge = result.dfg.edge("A", "B");
353 assert!(ab_edge.is_some());
354 assert_eq!(ab_edge.unwrap().count, 3);
355
356 let bc_edge = result.dfg.edge("B", "C");
358 assert!(bc_edge.is_some());
359 assert_eq!(bc_edge.unwrap().count, 2);
360
361 let be_edge = result.dfg.edge("B", "E");
363 assert!(be_edge.is_some());
364 assert_eq!(be_edge.unwrap().count, 1);
365 }
366
367 #[test]
368 fn test_start_end_activities() {
369 let log = create_test_log();
370 let result = DFGConstruction::compute(&log);
371
372 assert_eq!(result.dfg.start_activities.get("A").copied(), Some(3));
374
375 assert_eq!(result.dfg.end_activities.get("D").copied(), Some(3));
377 }
378
379 #[test]
380 fn test_activity_counts() {
381 let log = create_test_log();
382 let result = DFGConstruction::compute(&log);
383
384 assert_eq!(result.dfg.activity_counts.get("A").copied(), Some(3));
386
387 assert_eq!(result.dfg.activity_counts.get("C").copied(), Some(2));
389
390 assert_eq!(result.dfg.activity_counts.get("E").copied(), Some(1));
392 }
393
394 #[test]
395 fn test_filter_by_frequency() {
396 let log = create_test_log();
397 let result = DFGConstruction::compute(&log);
398
399 let filtered = DFGConstruction::filter_by_frequency(&result.dfg, 2);
401
402 assert!(filtered.edge("B", "E").is_none());
404
405 assert!(filtered.edge("A", "B").is_some());
407 }
408
409 #[test]
410 fn test_dfg_metrics() {
411 let log = create_test_log();
412 let result = DFGConstruction::compute(&log);
413 let metrics = DFGConstruction::calculate_metrics(&result.dfg);
414
415 assert_eq!(metrics.node_count, 5);
416 assert!(metrics.edge_count > 0);
417 assert!(metrics.density > 0.0 && metrics.density <= 1.0);
418 assert_eq!(metrics.start_activity_count, 1); assert_eq!(metrics.end_activity_count, 1); }
421
422 #[test]
423 fn test_single_trace() {
424 let trace = Trace {
425 case_id: "test".to_string(),
426 events: vec![
427 ProcessEvent {
428 id: 1,
429 case_id: "test".to_string(),
430 activity: "X".to_string(),
431 timestamp: 1000,
432 resource: None,
433 attributes: HashMap::new(),
434 },
435 ProcessEvent {
436 id: 2,
437 case_id: "test".to_string(),
438 activity: "Y".to_string(),
439 timestamp: 2000,
440 resource: None,
441 attributes: HashMap::new(),
442 },
443 ],
444 attributes: HashMap::new(),
445 };
446
447 let dfg = DFGConstruction::compute_trace(&trace);
448
449 assert_eq!(dfg.activities.len(), 2);
450 assert!(dfg.edge("X", "Y").is_some());
451 assert_eq!(dfg.start_activities.get("X").copied(), Some(1));
452 assert_eq!(dfg.end_activities.get("Y").copied(), Some(1));
453 }
454
455 #[test]
456 fn test_empty_log() {
457 let log = EventLog::new("empty".to_string());
458 let result = DFGConstruction::compute(&log);
459
460 assert_eq!(result.trace_count, 0);
461 assert_eq!(result.event_count, 0);
462 assert!(result.dfg.activities.is_empty());
463 }
464}