rustkernel_procint/
ocpm.rs

1//! Object-Centric Process Mining (OCPM) kernel.
2//!
3//! This module provides OCPM pattern matching capabilities:
4//! - Multi-object event correlation
5//! - Object lifecycle analysis
6//! - Cross-object pattern detection
7//! - Object flow analysis
8
9use crate::types::{OCPMEventLog, OCPMPatternResult};
10use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
11use std::collections::{HashMap, HashSet};
12
13// ============================================================================
14// OCPM Pattern Matching Kernel
15// ============================================================================
16
17/// OCPM pattern matching kernel.
18///
19/// Detects patterns in object-centric event logs where events can relate
20/// to multiple objects of different types.
21#[derive(Debug, Clone)]
22pub struct OCPMPatternMatching {
23    metadata: KernelMetadata,
24}
25
26impl Default for OCPMPatternMatching {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl OCPMPatternMatching {
33    /// Create a new OCPM pattern matching kernel.
34    #[must_use]
35    pub fn new() -> Self {
36        Self {
37            metadata: KernelMetadata::batch("procint/ocpm-patterns", Domain::ProcessIntelligence)
38                .with_description("Object-centric process mining patterns")
39                .with_throughput(20_000)
40                .with_latency_us(200.0),
41        }
42    }
43
44    /// Detect object lifecycle patterns.
45    pub fn detect_lifecycle_patterns(
46        log: &OCPMEventLog,
47        object_type: &str,
48    ) -> Vec<OCPMPatternResult> {
49        let mut patterns = Vec::new();
50
51        // Get objects of the specified type
52        let objects: Vec<_> = log
53            .objects
54            .values()
55            .filter(|o| o.object_type == object_type)
56            .collect();
57
58        for object in objects {
59            let events = log.events_for_object(&object.id);
60
61            if events.is_empty() {
62                continue;
63            }
64
65            // Sort events by timestamp
66            let mut sorted_events: Vec<_> = events.iter().collect();
67            sorted_events.sort_by_key(|e| e.timestamp);
68
69            // Extract activity sequence
70            let sequence: Vec<_> = sorted_events.iter().map(|e| e.activity.as_str()).collect();
71
72            // Detect lifecycle pattern
73            let pattern_name = classify_lifecycle(&sequence);
74            let score = calculate_lifecycle_score(&sequence);
75
76            patterns.push(OCPMPatternResult {
77                pattern_name,
78                matched_objects: vec![object.id.clone()],
79                matched_events: sorted_events.iter().map(|e| e.id).collect(),
80                score,
81                description: format!(
82                    "Object {} follows lifecycle: {}",
83                    object.id,
84                    sequence.join(" -> ")
85                ),
86            });
87        }
88
89        patterns
90    }
91
92    /// Detect cross-object patterns (object interactions).
93    pub fn detect_interaction_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
94        let mut patterns = Vec::new();
95        let mut interaction_counts: HashMap<(String, String), Vec<u64>> = HashMap::new();
96
97        // Find events that involve multiple objects
98        for event in &log.events {
99            if event.objects.len() >= 2 {
100                // Record all pairs of objects
101                for i in 0..event.objects.len() {
102                    for j in (i + 1)..event.objects.len() {
103                        let obj1 = &event.objects[i];
104                        let obj2 = &event.objects[j];
105
106                        let key = if obj1 < obj2 {
107                            (obj1.clone(), obj2.clone())
108                        } else {
109                            (obj2.clone(), obj1.clone())
110                        };
111
112                        interaction_counts.entry(key).or_default().push(event.id);
113                    }
114                }
115            }
116        }
117
118        // Generate patterns for significant interactions
119        for ((obj1, obj2), event_ids) in interaction_counts {
120            if event_ids.len() >= 2 {
121                // Get object types
122                let type1 = log
123                    .objects
124                    .get(&obj1)
125                    .map(|o| o.object_type.as_str())
126                    .unwrap_or("unknown");
127                let type2 = log
128                    .objects
129                    .get(&obj2)
130                    .map(|o| o.object_type.as_str())
131                    .unwrap_or("unknown");
132
133                patterns.push(OCPMPatternResult {
134                    pattern_name: format!("{}_{}_interaction", type1, type2),
135                    matched_objects: vec![obj1.clone(), obj2.clone()],
136                    matched_events: event_ids.clone(),
137                    score: event_ids.len() as f64 / log.events.len().max(1) as f64,
138                    description: format!(
139                        "Objects {} and {} interact in {} events",
140                        obj1,
141                        obj2,
142                        event_ids.len()
143                    ),
144                });
145            }
146        }
147
148        patterns
149    }
150
151    /// Detect convergence patterns (multiple objects leading to one).
152    pub fn detect_convergence_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
153        let mut patterns = Vec::new();
154        let mut convergence_points: HashMap<u64, HashSet<String>> = HashMap::new();
155
156        // Find events where multiple objects converge
157        for event in &log.events {
158            if event.objects.len() >= 2 {
159                let objects_set: HashSet<_> = event.objects.iter().cloned().collect();
160                convergence_points.insert(event.id, objects_set);
161            }
162        }
163
164        // Analyze convergence sequences
165        for (event_id, objects) in &convergence_points {
166            if objects.len() >= 3 {
167                let event = log.events.iter().find(|e| e.id == *event_id).unwrap();
168
169                patterns.push(OCPMPatternResult {
170                    pattern_name: "convergence".to_string(),
171                    matched_objects: objects.iter().cloned().collect(),
172                    matched_events: vec![*event_id],
173                    score: objects.len() as f64 / 10.0, // Normalize by assumed max
174                    description: format!(
175                        "Convergence point at '{}' with {} objects",
176                        event.activity,
177                        objects.len()
178                    ),
179                });
180            }
181        }
182
183        patterns
184    }
185
186    /// Detect divergence patterns (one object leading to multiple).
187    pub fn detect_divergence_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
188        let mut patterns = Vec::new();
189
190        // Track object appearances over time
191        let mut object_first_seen: HashMap<String, (u64, u64)> = HashMap::new(); // object -> (event_id, timestamp)
192
193        for event in &log.events {
194            for obj_id in &event.objects {
195                object_first_seen
196                    .entry(obj_id.clone())
197                    .or_insert((event.id, event.timestamp));
198            }
199        }
200
201        // Find events that spawn multiple new objects
202        let mut spawn_events: HashMap<u64, Vec<String>> = HashMap::new();
203
204        for (obj_id, (event_id, _)) in &object_first_seen {
205            spawn_events
206                .entry(*event_id)
207                .or_default()
208                .push(obj_id.clone());
209        }
210
211        for (event_id, new_objects) in spawn_events {
212            if new_objects.len() >= 2 {
213                let event = log.events.iter().find(|e| e.id == event_id);
214                if let Some(event) = event {
215                    patterns.push(OCPMPatternResult {
216                        pattern_name: "divergence".to_string(),
217                        matched_objects: new_objects.clone(),
218                        matched_events: vec![event_id],
219                        score: new_objects.len() as f64 / 10.0,
220                        description: format!(
221                            "Divergence point at '{}' creating {} objects",
222                            event.activity,
223                            new_objects.len()
224                        ),
225                    });
226                }
227            }
228        }
229
230        patterns
231    }
232
233    /// Detect synchronization patterns.
234    pub fn detect_sync_patterns(log: &OCPMEventLog, time_window_ms: u64) -> Vec<OCPMPatternResult> {
235        let mut patterns = Vec::new();
236
237        // Sort events by timestamp
238        let mut sorted_events: Vec<_> = log.events.iter().collect();
239        sorted_events.sort_by_key(|e| e.timestamp);
240
241        // Find events within time window that share objects
242        for i in 0..sorted_events.len() {
243            let event_i = sorted_events[i];
244            let mut sync_group = vec![event_i.id];
245            let mut sync_objects: HashSet<String> = event_i.objects.iter().cloned().collect();
246
247            for event_j in sorted_events.iter().skip(i + 1) {
248                if event_j.timestamp > event_i.timestamp + time_window_ms {
249                    break;
250                }
251
252                // Check if events share any objects
253                let shared: HashSet<_> = event_j
254                    .objects
255                    .iter()
256                    .filter(|o| sync_objects.contains(*o))
257                    .cloned()
258                    .collect();
259
260                if !shared.is_empty() {
261                    sync_group.push(event_j.id);
262                    sync_objects.extend(event_j.objects.iter().cloned());
263                }
264            }
265
266            if sync_group.len() >= 3 {
267                patterns.push(OCPMPatternResult {
268                    pattern_name: "synchronization".to_string(),
269                    matched_objects: sync_objects.iter().cloned().collect(),
270                    matched_events: sync_group.clone(),
271                    score: sync_group.len() as f64 / 10.0,
272                    description: format!(
273                        "Synchronization of {} events within {}ms window",
274                        sync_group.len(),
275                        time_window_ms
276                    ),
277                });
278            }
279        }
280
281        patterns
282    }
283
284    /// Calculate object flow metrics.
285    pub fn calculate_flow_metrics(log: &OCPMEventLog) -> ObjectFlowMetrics {
286        let mut object_event_counts: HashMap<String, u64> = HashMap::new();
287        let mut activity_object_counts: HashMap<String, HashSet<String>> = HashMap::new();
288        let mut object_type_counts: HashMap<String, u64> = HashMap::new();
289
290        for event in &log.events {
291            for obj_id in &event.objects {
292                *object_event_counts.entry(obj_id.clone()).or_insert(0) += 1;
293
294                activity_object_counts
295                    .entry(event.activity.clone())
296                    .or_default()
297                    .insert(obj_id.clone());
298            }
299        }
300
301        for obj in log.objects.values() {
302            *object_type_counts
303                .entry(obj.object_type.clone())
304                .or_insert(0) += 1;
305        }
306
307        let avg_events_per_object = if !object_event_counts.is_empty() {
308            object_event_counts.values().sum::<u64>() as f64 / object_event_counts.len() as f64
309        } else {
310            0.0
311        };
312
313        let avg_objects_per_activity = if !activity_object_counts.is_empty() {
314            activity_object_counts
315                .values()
316                .map(|s| s.len() as f64)
317                .sum::<f64>()
318                / activity_object_counts.len() as f64
319        } else {
320            0.0
321        };
322
323        let max_objects_per_event = log
324            .events
325            .iter()
326            .map(|e| e.objects.len())
327            .max()
328            .unwrap_or(0);
329
330        ObjectFlowMetrics {
331            object_count: log.objects.len(),
332            event_count: log.events.len(),
333            object_type_count: object_type_counts.len(),
334            avg_events_per_object,
335            avg_objects_per_activity,
336            max_objects_per_event,
337            object_type_distribution: object_type_counts,
338        }
339    }
340
341    /// Detect batching patterns (objects processed together).
342    pub fn detect_batching_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
343        let mut patterns = Vec::new();
344        let mut activity_batches: HashMap<String, Vec<HashSet<String>>> = HashMap::new();
345
346        // Group events by activity and timestamp proximity
347        let mut sorted_events: Vec<_> = log.events.iter().collect();
348        sorted_events.sort_by_key(|e| (e.activity.clone(), e.timestamp));
349
350        let mut current_activity = String::new();
351        let mut current_batch: HashSet<String> = HashSet::new();
352        let mut batch_events: Vec<u64> = Vec::new();
353        let mut last_timestamp = 0u64;
354
355        for event in sorted_events {
356            if event.activity != current_activity || event.timestamp > last_timestamp + 1000 {
357                // New batch
358                if current_batch.len() >= 3 {
359                    activity_batches
360                        .entry(current_activity.clone())
361                        .or_default()
362                        .push(current_batch.clone());
363
364                    patterns.push(OCPMPatternResult {
365                        pattern_name: format!("{}_batch", current_activity),
366                        matched_objects: current_batch.iter().cloned().collect(),
367                        matched_events: batch_events.clone(),
368                        score: current_batch.len() as f64 / 10.0,
369                        description: format!(
370                            "Batch of {} objects in activity '{}'",
371                            current_batch.len(),
372                            current_activity
373                        ),
374                    });
375                }
376
377                current_activity = event.activity.clone();
378                current_batch.clear();
379                batch_events.clear();
380            }
381
382            current_batch.extend(event.objects.iter().cloned());
383            batch_events.push(event.id);
384            last_timestamp = event.timestamp;
385        }
386
387        // Don't forget the last batch
388        if current_batch.len() >= 3 {
389            patterns.push(OCPMPatternResult {
390                pattern_name: format!("{}_batch", current_activity),
391                matched_objects: current_batch.iter().cloned().collect(),
392                matched_events: batch_events,
393                score: current_batch.len() as f64 / 10.0,
394                description: format!(
395                    "Batch of {} objects in activity '{}'",
396                    current_batch.len(),
397                    current_activity
398                ),
399            });
400        }
401
402        patterns
403    }
404}
405
406impl GpuKernel for OCPMPatternMatching {
407    fn metadata(&self) -> &KernelMetadata {
408        &self.metadata
409    }
410}
411
412/// Object flow metrics.
413#[derive(Debug, Clone)]
414pub struct ObjectFlowMetrics {
415    /// Total number of objects.
416    pub object_count: usize,
417    /// Total number of events.
418    pub event_count: usize,
419    /// Number of distinct object types.
420    pub object_type_count: usize,
421    /// Average events per object.
422    pub avg_events_per_object: f64,
423    /// Average objects per activity.
424    pub avg_objects_per_activity: f64,
425    /// Maximum objects in a single event.
426    pub max_objects_per_event: usize,
427    /// Distribution of object types.
428    pub object_type_distribution: HashMap<String, u64>,
429}
430
431/// Classify a lifecycle pattern.
432fn classify_lifecycle(sequence: &[&str]) -> String {
433    if sequence.is_empty() {
434        return "empty".to_string();
435    }
436
437    // Check for common patterns
438    if sequence.len() == 1 {
439        return "single_event".to_string();
440    }
441
442    // Check for creation -> ... -> completion pattern
443    let first = sequence[0].to_lowercase();
444    let last = sequence[sequence.len() - 1].to_lowercase();
445
446    if (first.contains("create") || first.contains("start") || first.contains("init"))
447        && (last.contains("complete") || last.contains("end") || last.contains("close"))
448    {
449        return "full_lifecycle".to_string();
450    }
451
452    // Check for loop patterns
453    let unique: HashSet<_> = sequence.iter().collect();
454    if unique.len() < sequence.len() / 2 {
455        return "loop_heavy".to_string();
456    }
457
458    "sequential".to_string()
459}
460
461/// Calculate lifecycle completeness score.
462fn calculate_lifecycle_score(sequence: &[&str]) -> f64 {
463    if sequence.is_empty() {
464        return 0.0;
465    }
466
467    let has_start = sequence.iter().any(|s| {
468        let lower = s.to_lowercase();
469        lower.contains("create") || lower.contains("start") || lower.contains("init")
470    });
471
472    let has_end = sequence.iter().any(|s| {
473        let lower = s.to_lowercase();
474        lower.contains("complete") || lower.contains("end") || lower.contains("close")
475    });
476
477    let unique_ratio = sequence.iter().collect::<HashSet<_>>().len() as f64 / sequence.len() as f64;
478
479    let mut score = 0.5 * unique_ratio;
480    if has_start {
481        score += 0.25;
482    }
483    if has_end {
484        score += 0.25;
485    }
486
487    score
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::types::{OCPMEvent, OCPMObject};
494
495    fn create_test_ocpm_log() -> OCPMEventLog {
496        let mut log = OCPMEventLog::new();
497
498        // Add objects
499        log.add_object(OCPMObject {
500            id: "order1".to_string(),
501            object_type: "Order".to_string(),
502            attributes: HashMap::new(),
503        });
504        log.add_object(OCPMObject {
505            id: "order2".to_string(),
506            object_type: "Order".to_string(),
507            attributes: HashMap::new(),
508        });
509        log.add_object(OCPMObject {
510            id: "item1".to_string(),
511            object_type: "Item".to_string(),
512            attributes: HashMap::new(),
513        });
514        log.add_object(OCPMObject {
515            id: "item2".to_string(),
516            object_type: "Item".to_string(),
517            attributes: HashMap::new(),
518        });
519        log.add_object(OCPMObject {
520            id: "item3".to_string(),
521            object_type: "Item".to_string(),
522            attributes: HashMap::new(),
523        });
524
525        // Add events
526        log.add_event(OCPMEvent {
527            id: 1,
528            activity: "Create Order".to_string(),
529            timestamp: 1000,
530            objects: vec!["order1".to_string()],
531            attributes: HashMap::new(),
532        });
533        log.add_event(OCPMEvent {
534            id: 2,
535            activity: "Add Item".to_string(),
536            timestamp: 2000,
537            objects: vec!["order1".to_string(), "item1".to_string()],
538            attributes: HashMap::new(),
539        });
540        log.add_event(OCPMEvent {
541            id: 3,
542            activity: "Add Item".to_string(),
543            timestamp: 2100,
544            objects: vec!["order1".to_string(), "item2".to_string()],
545            attributes: HashMap::new(),
546        });
547        log.add_event(OCPMEvent {
548            id: 4,
549            activity: "Process Payment".to_string(),
550            timestamp: 3000,
551            objects: vec!["order1".to_string(), "order2".to_string()],
552            attributes: HashMap::new(),
553        });
554        log.add_event(OCPMEvent {
555            id: 5,
556            activity: "Complete Order".to_string(),
557            timestamp: 4000,
558            objects: vec!["order1".to_string()],
559            attributes: HashMap::new(),
560        });
561
562        log
563    }
564
565    #[test]
566    fn test_ocpm_metadata() {
567        let kernel = OCPMPatternMatching::new();
568        assert_eq!(kernel.metadata().id, "procint/ocpm-patterns");
569        assert_eq!(kernel.metadata().domain, Domain::ProcessIntelligence);
570    }
571
572    #[test]
573    fn test_lifecycle_detection() {
574        let log = create_test_ocpm_log();
575
576        let patterns = OCPMPatternMatching::detect_lifecycle_patterns(&log, "Order");
577
578        assert!(!patterns.is_empty());
579
580        // order1 should have a full lifecycle
581        let order1_pattern = patterns
582            .iter()
583            .find(|p| p.matched_objects.contains(&"order1".to_string()));
584        assert!(order1_pattern.is_some());
585    }
586
587    #[test]
588    fn test_interaction_detection() {
589        let log = create_test_ocpm_log();
590
591        let patterns = OCPMPatternMatching::detect_interaction_patterns(&log);
592
593        // order1 and item1 should have interaction pattern
594        let has_order_item = patterns.iter().any(|p| {
595            p.matched_objects.contains(&"order1".to_string())
596                && (p.matched_objects.contains(&"item1".to_string())
597                    || p.matched_objects.contains(&"item2".to_string()))
598        });
599
600        // Note: might not have enough interactions (need 2+)
601        assert!(patterns.is_empty() || has_order_item);
602    }
603
604    #[test]
605    fn test_flow_metrics() {
606        let log = create_test_ocpm_log();
607
608        let metrics = OCPMPatternMatching::calculate_flow_metrics(&log);
609
610        assert_eq!(metrics.object_count, 5);
611        assert_eq!(metrics.event_count, 5);
612        assert_eq!(metrics.object_type_count, 2);
613        assert!(metrics.avg_events_per_object > 0.0);
614        assert!(metrics.max_objects_per_event >= 2);
615    }
616
617    #[test]
618    fn test_convergence_detection() {
619        let mut log = OCPMEventLog::new();
620
621        // Create convergence point with 3+ objects
622        for i in 1..=4 {
623            log.add_object(OCPMObject {
624                id: format!("obj{}", i),
625                object_type: "Item".to_string(),
626                attributes: HashMap::new(),
627            });
628        }
629
630        log.add_event(OCPMEvent {
631            id: 1,
632            activity: "Merge".to_string(),
633            timestamp: 1000,
634            objects: vec![
635                "obj1".to_string(),
636                "obj2".to_string(),
637                "obj3".to_string(),
638                "obj4".to_string(),
639            ],
640            attributes: HashMap::new(),
641        });
642
643        let patterns = OCPMPatternMatching::detect_convergence_patterns(&log);
644
645        assert!(!patterns.is_empty());
646        assert!(patterns[0].matched_objects.len() >= 3);
647    }
648
649    #[test]
650    fn test_divergence_detection() {
651        let mut log = OCPMEventLog::new();
652
653        // Event that spawns multiple new objects
654        for i in 1..=3 {
655            log.add_object(OCPMObject {
656                id: format!("new{}", i),
657                object_type: "Product".to_string(),
658                attributes: HashMap::new(),
659            });
660        }
661
662        log.add_event(OCPMEvent {
663            id: 1,
664            activity: "Split".to_string(),
665            timestamp: 1000,
666            objects: vec!["new1".to_string(), "new2".to_string(), "new3".to_string()],
667            attributes: HashMap::new(),
668        });
669
670        let patterns = OCPMPatternMatching::detect_divergence_patterns(&log);
671
672        assert!(!patterns.is_empty());
673        assert_eq!(patterns[0].pattern_name, "divergence");
674    }
675
676    #[test]
677    fn test_sync_patterns() {
678        let mut log = OCPMEventLog::new();
679
680        // Create shared objects
681        log.add_object(OCPMObject {
682            id: "shared".to_string(),
683            object_type: "Resource".to_string(),
684            attributes: HashMap::new(),
685        });
686
687        // Multiple events within time window sharing an object
688        for i in 0..5 {
689            log.add_event(OCPMEvent {
690                id: i,
691                activity: format!("Process_{}", i),
692                timestamp: 1000 + i * 100, // Within 500ms window
693                objects: vec!["shared".to_string()],
694                attributes: HashMap::new(),
695            });
696        }
697
698        let patterns = OCPMPatternMatching::detect_sync_patterns(&log, 500);
699
700        assert!(!patterns.is_empty());
701    }
702
703    #[test]
704    fn test_batching_detection() {
705        let mut log = OCPMEventLog::new();
706
707        // Create objects for batch
708        for i in 1..=5 {
709            log.add_object(OCPMObject {
710                id: format!("batch_item{}", i),
711                object_type: "Item".to_string(),
712                attributes: HashMap::new(),
713            });
714        }
715
716        // Batch processing events
717        for i in 1..=5 {
718            log.add_event(OCPMEvent {
719                id: i,
720                activity: "BatchProcess".to_string(),
721                timestamp: 1000 + i * 100,
722                objects: vec![format!("batch_item{}", i)],
723                attributes: HashMap::new(),
724            });
725        }
726
727        let patterns = OCPMPatternMatching::detect_batching_patterns(&log);
728
729        // Should detect a batch pattern
730        assert!(!patterns.is_empty());
731        assert!(patterns[0].pattern_name.contains("batch"));
732    }
733
734    #[test]
735    fn test_empty_log() {
736        let log = OCPMEventLog::new();
737
738        let lifecycle = OCPMPatternMatching::detect_lifecycle_patterns(&log, "Order");
739        assert!(lifecycle.is_empty());
740
741        let metrics = OCPMPatternMatching::calculate_flow_metrics(&log);
742        assert_eq!(metrics.object_count, 0);
743        assert_eq!(metrics.event_count, 0);
744    }
745
746    #[test]
747    fn test_lifecycle_classification() {
748        // Full lifecycle
749        let full = classify_lifecycle(&["Create Order", "Process", "Complete Order"]);
750        assert_eq!(full, "full_lifecycle");
751
752        // Sequential
753        let seq = classify_lifecycle(&["A", "B", "C", "D"]);
754        assert_eq!(seq, "sequential");
755
756        // Loop heavy
757        let loops = classify_lifecycle(&["A", "B", "A", "B", "A", "B"]);
758        assert_eq!(loops, "loop_heavy");
759
760        // Empty
761        let empty = classify_lifecycle(&[]);
762        assert_eq!(empty, "empty");
763    }
764
765    #[test]
766    fn test_lifecycle_score() {
767        // Full lifecycle should score high
768        let full_score = calculate_lifecycle_score(&["start", "process", "end"]);
769        assert!(full_score >= 0.75);
770
771        // No start/end markers should score lower
772        let mid_score = calculate_lifecycle_score(&["A", "B", "C"]);
773        assert!(mid_score < 0.75);
774    }
775}