rustkernel_procint/
dfg.rs

1//! Directly-Follows Graph construction kernel.
2//!
3//! This module provides DFG construction from event logs:
4//! - Activity frequency calculation
5//! - Directly-follows relationship extraction
6//! - Start/end activity identification
7
8use crate::types::{DFGEdge, DFGResult, DirectlyFollowsGraph, EventLog, Trace};
9use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
10use std::collections::HashMap;
11
12// ============================================================================
13// DFG Construction Kernel
14// ============================================================================
15
16/// DFG construction kernel.
17///
18/// Constructs a directly-follows graph from an event log.
19#[derive(Debug, Clone)]
20pub struct DFGConstruction {
21    metadata: KernelMetadata,
22}
23
24impl Default for DFGConstruction {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl DFGConstruction {
31    /// Create a new DFG construction kernel.
32    #[must_use]
33    pub fn new() -> Self {
34        Self {
35            metadata: KernelMetadata::batch(
36                "procint/dfg-construction",
37                Domain::ProcessIntelligence,
38            )
39            .with_description("Directly-follows graph construction")
40            .with_throughput(100_000)
41            .with_latency_us(50.0),
42        }
43    }
44
45    /// Construct DFG from an event log.
46    pub fn compute(log: &EventLog) -> DFGResult {
47        let mut dfg = DirectlyFollowsGraph::new();
48        let mut edge_map: HashMap<(String, String), (u64, Vec<u64>)> = HashMap::new();
49
50        let mut event_count = 0u64;
51
52        for trace in log.traces.values() {
53            // Get sorted events
54            let mut events: Vec<_> = trace.events.iter().collect();
55            events.sort_by_key(|e| e.timestamp);
56
57            event_count += events.len() as u64;
58
59            // Track start and end activities
60            if let Some(first) = events.first() {
61                *dfg.start_activities
62                    .entry(first.activity.clone())
63                    .or_insert(0) += 1;
64            }
65            if let Some(last) = events.last() {
66                *dfg.end_activities.entry(last.activity.clone()).or_insert(0) += 1;
67            }
68
69            // Count activity occurrences
70            for event in &events {
71                *dfg.activity_counts
72                    .entry(event.activity.clone())
73                    .or_insert(0) += 1;
74            }
75
76            // Extract directly-follows pairs
77            for window in events.windows(2) {
78                let source = &window[0].activity;
79                let target = &window[1].activity;
80                let duration = window[1].timestamp.saturating_sub(window[0].timestamp);
81
82                let key = (source.clone(), target.clone());
83                let entry = edge_map.entry(key).or_insert((0, Vec::new()));
84                entry.0 += 1;
85                entry.1.push(duration);
86            }
87        }
88
89        // Build activities list
90        dfg.activities = dfg.activity_counts.keys().cloned().collect();
91        dfg.activities.sort();
92
93        // Save edge count before consuming edge_map
94        let unique_pairs = edge_map.len() as u64;
95
96        // Build edges
97        for ((source, target), (count, durations)) in edge_map {
98            let avg_duration = if durations.is_empty() {
99                0.0
100            } else {
101                durations.iter().sum::<u64>() as f64 / durations.len() as f64
102            };
103
104            dfg.edges.push(DFGEdge {
105                source,
106                target,
107                count,
108                avg_duration_ms: avg_duration,
109            });
110        }
111
112        // Sort edges by count descending
113        dfg.edges.sort_by(|a, b| b.count.cmp(&a.count));
114
115        DFGResult {
116            dfg,
117            trace_count: log.trace_count() as u64,
118            event_count,
119            unique_pairs,
120        }
121    }
122
123    /// Construct DFG from a single trace.
124    pub fn compute_trace(trace: &Trace) -> DirectlyFollowsGraph {
125        let mut dfg = DirectlyFollowsGraph::new();
126        let mut edge_map: HashMap<(String, String), (u64, Vec<u64>)> = HashMap::new();
127
128        let mut events: Vec<_> = trace.events.iter().collect();
129        events.sort_by_key(|e| e.timestamp);
130
131        // Track start and end
132        if let Some(first) = events.first() {
133            dfg.start_activities.insert(first.activity.clone(), 1);
134        }
135        if let Some(last) = events.last() {
136            dfg.end_activities.insert(last.activity.clone(), 1);
137        }
138
139        // Count activities
140        for event in &events {
141            *dfg.activity_counts
142                .entry(event.activity.clone())
143                .or_insert(0) += 1;
144        }
145
146        // Extract pairs
147        for window in events.windows(2) {
148            let source = &window[0].activity;
149            let target = &window[1].activity;
150            let duration = window[1].timestamp.saturating_sub(window[0].timestamp);
151
152            let key = (source.clone(), target.clone());
153            let entry = edge_map.entry(key).or_insert((0, Vec::new()));
154            entry.0 += 1;
155            entry.1.push(duration);
156        }
157
158        // Build activities and edges
159        dfg.activities = dfg.activity_counts.keys().cloned().collect();
160        dfg.activities.sort();
161
162        for ((source, target), (count, durations)) in edge_map {
163            let avg_duration = if durations.is_empty() {
164                0.0
165            } else {
166                durations.iter().sum::<u64>() as f64 / durations.len() as f64
167            };
168
169            dfg.edges.push(DFGEdge {
170                source,
171                target,
172                count,
173                avg_duration_ms: avg_duration,
174            });
175        }
176
177        dfg
178    }
179
180    /// Filter DFG by minimum edge frequency.
181    pub fn filter_by_frequency(dfg: &DirectlyFollowsGraph, min_count: u64) -> DirectlyFollowsGraph {
182        let mut filtered = DirectlyFollowsGraph::new();
183
184        // Keep activities that appear in filtered edges
185        let mut active_activities = std::collections::HashSet::new();
186
187        filtered.edges = dfg
188            .edges
189            .iter()
190            .filter(|e| e.count >= min_count)
191            .map(|e| {
192                active_activities.insert(e.source.clone());
193                active_activities.insert(e.target.clone());
194                e.clone()
195            })
196            .collect();
197
198        filtered.activities = active_activities.into_iter().collect();
199        filtered.activities.sort();
200
201        // Filter activity counts
202        filtered.activity_counts = dfg
203            .activity_counts
204            .iter()
205            .filter(|(k, _)| filtered.activities.contains(*k))
206            .map(|(k, v)| (k.clone(), *v))
207            .collect();
208
209        // Filter start/end activities
210        filtered.start_activities = dfg
211            .start_activities
212            .iter()
213            .filter(|(k, _)| filtered.activities.contains(*k))
214            .map(|(k, v)| (k.clone(), *v))
215            .collect();
216
217        filtered.end_activities = dfg
218            .end_activities
219            .iter()
220            .filter(|(k, _)| filtered.activities.contains(*k))
221            .map(|(k, v)| (k.clone(), *v))
222            .collect();
223
224        filtered
225    }
226
227    /// Calculate graph metrics.
228    pub fn calculate_metrics(dfg: &DirectlyFollowsGraph) -> DFGMetrics {
229        let node_count = dfg.activities.len();
230        let edge_count = dfg.edges.len();
231
232        let max_possible_edges = node_count * node_count;
233        let density = if max_possible_edges > 0 {
234            edge_count as f64 / max_possible_edges as f64
235        } else {
236            0.0
237        };
238
239        let total_edge_weight: u64 = dfg.edges.iter().map(|e| e.count).sum();
240        let avg_edge_weight = if edge_count > 0 {
241            total_edge_weight as f64 / edge_count as f64
242        } else {
243            0.0
244        };
245
246        DFGMetrics {
247            node_count,
248            edge_count,
249            density,
250            avg_edge_weight,
251            start_activity_count: dfg.start_activities.len(),
252            end_activity_count: dfg.end_activities.len(),
253        }
254    }
255}
256
257impl GpuKernel for DFGConstruction {
258    fn metadata(&self) -> &KernelMetadata {
259        &self.metadata
260    }
261}
262
263/// DFG metrics.
264#[derive(Debug, Clone)]
265pub struct DFGMetrics {
266    /// Number of nodes (activities).
267    pub node_count: usize,
268    /// Number of edges.
269    pub edge_count: usize,
270    /// Graph density.
271    pub density: f64,
272    /// Average edge weight.
273    pub avg_edge_weight: f64,
274    /// Number of start activities.
275    pub start_activity_count: usize,
276    /// Number of end activities.
277    pub end_activity_count: usize,
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::types::ProcessEvent;
284
285    fn create_test_log() -> EventLog {
286        let mut log = EventLog::new("test_log".to_string());
287
288        // Trace 1: A -> B -> C -> D
289        for (i, activity) in ["A", "B", "C", "D"].iter().enumerate() {
290            log.add_event(ProcessEvent {
291                id: i as u64,
292                case_id: "case1".to_string(),
293                activity: activity.to_string(),
294                timestamp: (i as u64 + 1) * 1000,
295                resource: None,
296                attributes: HashMap::new(),
297            });
298        }
299
300        // Trace 2: A -> B -> C -> D (same pattern)
301        for (i, activity) in ["A", "B", "C", "D"].iter().enumerate() {
302            log.add_event(ProcessEvent {
303                id: (i + 10) as u64,
304                case_id: "case2".to_string(),
305                activity: activity.to_string(),
306                timestamp: (i as u64 + 1) * 1000,
307                resource: None,
308                attributes: HashMap::new(),
309            });
310        }
311
312        // Trace 3: A -> B -> E -> D (different middle)
313        for (i, activity) in ["A", "B", "E", "D"].iter().enumerate() {
314            log.add_event(ProcessEvent {
315                id: (i + 20) as u64,
316                case_id: "case3".to_string(),
317                activity: activity.to_string(),
318                timestamp: (i as u64 + 1) * 1000,
319                resource: None,
320                attributes: HashMap::new(),
321            });
322        }
323
324        log
325    }
326
327    #[test]
328    fn test_dfg_construction_metadata() {
329        let kernel = DFGConstruction::new();
330        assert_eq!(kernel.metadata().id, "procint/dfg-construction");
331        assert_eq!(kernel.metadata().domain, Domain::ProcessIntelligence);
332    }
333
334    #[test]
335    fn test_dfg_construction() {
336        let log = create_test_log();
337        let result = DFGConstruction::compute(&log);
338
339        assert_eq!(result.trace_count, 3);
340        assert_eq!(result.event_count, 12);
341
342        // Should have activities A, B, C, D, E
343        assert_eq!(result.dfg.activities.len(), 5);
344    }
345
346    #[test]
347    fn test_dfg_edges() {
348        let log = create_test_log();
349        let result = DFGConstruction::compute(&log);
350
351        // A -> B should appear in all 3 traces
352        let ab_edge = result.dfg.edge("A", "B");
353        assert!(ab_edge.is_some());
354        assert_eq!(ab_edge.unwrap().count, 3);
355
356        // B -> C should appear in 2 traces
357        let bc_edge = result.dfg.edge("B", "C");
358        assert!(bc_edge.is_some());
359        assert_eq!(bc_edge.unwrap().count, 2);
360
361        // B -> E should appear in 1 trace
362        let be_edge = result.dfg.edge("B", "E");
363        assert!(be_edge.is_some());
364        assert_eq!(be_edge.unwrap().count, 1);
365    }
366
367    #[test]
368    fn test_start_end_activities() {
369        let log = create_test_log();
370        let result = DFGConstruction::compute(&log);
371
372        // A is start activity in all traces
373        assert_eq!(result.dfg.start_activities.get("A").copied(), Some(3));
374
375        // D is end activity in all traces
376        assert_eq!(result.dfg.end_activities.get("D").copied(), Some(3));
377    }
378
379    #[test]
380    fn test_activity_counts() {
381        let log = create_test_log();
382        let result = DFGConstruction::compute(&log);
383
384        // A appears 3 times (once per trace)
385        assert_eq!(result.dfg.activity_counts.get("A").copied(), Some(3));
386
387        // C appears 2 times
388        assert_eq!(result.dfg.activity_counts.get("C").copied(), Some(2));
389
390        // E appears 1 time
391        assert_eq!(result.dfg.activity_counts.get("E").copied(), Some(1));
392    }
393
394    #[test]
395    fn test_filter_by_frequency() {
396        let log = create_test_log();
397        let result = DFGConstruction::compute(&log);
398
399        // Filter to only edges with count >= 2
400        let filtered = DFGConstruction::filter_by_frequency(&result.dfg, 2);
401
402        // B -> E should be removed (count=1)
403        assert!(filtered.edge("B", "E").is_none());
404
405        // A -> B should remain (count=3)
406        assert!(filtered.edge("A", "B").is_some());
407    }
408
409    #[test]
410    fn test_dfg_metrics() {
411        let log = create_test_log();
412        let result = DFGConstruction::compute(&log);
413        let metrics = DFGConstruction::calculate_metrics(&result.dfg);
414
415        assert_eq!(metrics.node_count, 5);
416        assert!(metrics.edge_count > 0);
417        assert!(metrics.density > 0.0 && metrics.density <= 1.0);
418        assert_eq!(metrics.start_activity_count, 1); // Only A is start
419        assert_eq!(metrics.end_activity_count, 1); // Only D is end
420    }
421
422    #[test]
423    fn test_single_trace() {
424        let trace = Trace {
425            case_id: "test".to_string(),
426            events: vec![
427                ProcessEvent {
428                    id: 1,
429                    case_id: "test".to_string(),
430                    activity: "X".to_string(),
431                    timestamp: 1000,
432                    resource: None,
433                    attributes: HashMap::new(),
434                },
435                ProcessEvent {
436                    id: 2,
437                    case_id: "test".to_string(),
438                    activity: "Y".to_string(),
439                    timestamp: 2000,
440                    resource: None,
441                    attributes: HashMap::new(),
442                },
443            ],
444            attributes: HashMap::new(),
445        };
446
447        let dfg = DFGConstruction::compute_trace(&trace);
448
449        assert_eq!(dfg.activities.len(), 2);
450        assert!(dfg.edge("X", "Y").is_some());
451        assert_eq!(dfg.start_activities.get("X").copied(), Some(1));
452        assert_eq!(dfg.end_activities.get("Y").copied(), Some(1));
453    }
454
455    #[test]
456    fn test_empty_log() {
457        let log = EventLog::new("empty".to_string());
458        let result = DFGConstruction::compute(&log);
459
460        assert_eq!(result.trace_count, 0);
461        assert_eq!(result.event_count, 0);
462        assert!(result.dfg.activities.is_empty());
463    }
464}