1use crate::types::{OCPMEventLog, OCPMPatternResult};
10use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
11use std::collections::{HashMap, HashSet};
12
13#[derive(Debug, Clone)]
22pub struct OCPMPatternMatching {
23 metadata: KernelMetadata,
24}
25
26impl Default for OCPMPatternMatching {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl OCPMPatternMatching {
33 #[must_use]
35 pub fn new() -> Self {
36 Self {
37 metadata: KernelMetadata::batch("procint/ocpm-patterns", Domain::ProcessIntelligence)
38 .with_description("Object-centric process mining patterns")
39 .with_throughput(20_000)
40 .with_latency_us(200.0),
41 }
42 }
43
44 pub fn detect_lifecycle_patterns(
46 log: &OCPMEventLog,
47 object_type: &str,
48 ) -> Vec<OCPMPatternResult> {
49 let mut patterns = Vec::new();
50
51 let objects: Vec<_> = log
53 .objects
54 .values()
55 .filter(|o| o.object_type == object_type)
56 .collect();
57
58 for object in objects {
59 let events = log.events_for_object(&object.id);
60
61 if events.is_empty() {
62 continue;
63 }
64
65 let mut sorted_events: Vec<_> = events.iter().collect();
67 sorted_events.sort_by_key(|e| e.timestamp);
68
69 let sequence: Vec<_> = sorted_events.iter().map(|e| e.activity.as_str()).collect();
71
72 let pattern_name = classify_lifecycle(&sequence);
74 let score = calculate_lifecycle_score(&sequence);
75
76 patterns.push(OCPMPatternResult {
77 pattern_name,
78 matched_objects: vec![object.id.clone()],
79 matched_events: sorted_events.iter().map(|e| e.id).collect(),
80 score,
81 description: format!(
82 "Object {} follows lifecycle: {}",
83 object.id,
84 sequence.join(" -> ")
85 ),
86 });
87 }
88
89 patterns
90 }
91
92 pub fn detect_interaction_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
94 let mut patterns = Vec::new();
95 let mut interaction_counts: HashMap<(String, String), Vec<u64>> = HashMap::new();
96
97 for event in &log.events {
99 if event.objects.len() >= 2 {
100 for i in 0..event.objects.len() {
102 for j in (i + 1)..event.objects.len() {
103 let obj1 = &event.objects[i];
104 let obj2 = &event.objects[j];
105
106 let key = if obj1 < obj2 {
107 (obj1.clone(), obj2.clone())
108 } else {
109 (obj2.clone(), obj1.clone())
110 };
111
112 interaction_counts.entry(key).or_default().push(event.id);
113 }
114 }
115 }
116 }
117
118 for ((obj1, obj2), event_ids) in interaction_counts {
120 if event_ids.len() >= 2 {
121 let type1 = log
123 .objects
124 .get(&obj1)
125 .map(|o| o.object_type.as_str())
126 .unwrap_or("unknown");
127 let type2 = log
128 .objects
129 .get(&obj2)
130 .map(|o| o.object_type.as_str())
131 .unwrap_or("unknown");
132
133 patterns.push(OCPMPatternResult {
134 pattern_name: format!("{}_{}_interaction", type1, type2),
135 matched_objects: vec![obj1.clone(), obj2.clone()],
136 matched_events: event_ids.clone(),
137 score: event_ids.len() as f64 / log.events.len().max(1) as f64,
138 description: format!(
139 "Objects {} and {} interact in {} events",
140 obj1,
141 obj2,
142 event_ids.len()
143 ),
144 });
145 }
146 }
147
148 patterns
149 }
150
151 pub fn detect_convergence_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
153 let mut patterns = Vec::new();
154 let mut convergence_points: HashMap<u64, HashSet<String>> = HashMap::new();
155
156 for event in &log.events {
158 if event.objects.len() >= 2 {
159 let objects_set: HashSet<_> = event.objects.iter().cloned().collect();
160 convergence_points.insert(event.id, objects_set);
161 }
162 }
163
164 for (event_id, objects) in &convergence_points {
166 if objects.len() >= 3 {
167 let event = log.events.iter().find(|e| e.id == *event_id).unwrap();
168
169 patterns.push(OCPMPatternResult {
170 pattern_name: "convergence".to_string(),
171 matched_objects: objects.iter().cloned().collect(),
172 matched_events: vec![*event_id],
173 score: objects.len() as f64 / 10.0, description: format!(
175 "Convergence point at '{}' with {} objects",
176 event.activity,
177 objects.len()
178 ),
179 });
180 }
181 }
182
183 patterns
184 }
185
186 pub fn detect_divergence_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
188 let mut patterns = Vec::new();
189
190 let mut object_first_seen: HashMap<String, (u64, u64)> = HashMap::new(); for event in &log.events {
194 for obj_id in &event.objects {
195 object_first_seen
196 .entry(obj_id.clone())
197 .or_insert((event.id, event.timestamp));
198 }
199 }
200
201 let mut spawn_events: HashMap<u64, Vec<String>> = HashMap::new();
203
204 for (obj_id, (event_id, _)) in &object_first_seen {
205 spawn_events
206 .entry(*event_id)
207 .or_default()
208 .push(obj_id.clone());
209 }
210
211 for (event_id, new_objects) in spawn_events {
212 if new_objects.len() >= 2 {
213 let event = log.events.iter().find(|e| e.id == event_id);
214 if let Some(event) = event {
215 patterns.push(OCPMPatternResult {
216 pattern_name: "divergence".to_string(),
217 matched_objects: new_objects.clone(),
218 matched_events: vec![event_id],
219 score: new_objects.len() as f64 / 10.0,
220 description: format!(
221 "Divergence point at '{}' creating {} objects",
222 event.activity,
223 new_objects.len()
224 ),
225 });
226 }
227 }
228 }
229
230 patterns
231 }
232
233 pub fn detect_sync_patterns(log: &OCPMEventLog, time_window_ms: u64) -> Vec<OCPMPatternResult> {
235 let mut patterns = Vec::new();
236
237 let mut sorted_events: Vec<_> = log.events.iter().collect();
239 sorted_events.sort_by_key(|e| e.timestamp);
240
241 for i in 0..sorted_events.len() {
243 let event_i = sorted_events[i];
244 let mut sync_group = vec![event_i.id];
245 let mut sync_objects: HashSet<String> = event_i.objects.iter().cloned().collect();
246
247 for event_j in sorted_events.iter().skip(i + 1) {
248 if event_j.timestamp > event_i.timestamp + time_window_ms {
249 break;
250 }
251
252 let shared: HashSet<_> = event_j
254 .objects
255 .iter()
256 .filter(|o| sync_objects.contains(*o))
257 .cloned()
258 .collect();
259
260 if !shared.is_empty() {
261 sync_group.push(event_j.id);
262 sync_objects.extend(event_j.objects.iter().cloned());
263 }
264 }
265
266 if sync_group.len() >= 3 {
267 patterns.push(OCPMPatternResult {
268 pattern_name: "synchronization".to_string(),
269 matched_objects: sync_objects.iter().cloned().collect(),
270 matched_events: sync_group.clone(),
271 score: sync_group.len() as f64 / 10.0,
272 description: format!(
273 "Synchronization of {} events within {}ms window",
274 sync_group.len(),
275 time_window_ms
276 ),
277 });
278 }
279 }
280
281 patterns
282 }
283
284 pub fn calculate_flow_metrics(log: &OCPMEventLog) -> ObjectFlowMetrics {
286 let mut object_event_counts: HashMap<String, u64> = HashMap::new();
287 let mut activity_object_counts: HashMap<String, HashSet<String>> = HashMap::new();
288 let mut object_type_counts: HashMap<String, u64> = HashMap::new();
289
290 for event in &log.events {
291 for obj_id in &event.objects {
292 *object_event_counts.entry(obj_id.clone()).or_insert(0) += 1;
293
294 activity_object_counts
295 .entry(event.activity.clone())
296 .or_default()
297 .insert(obj_id.clone());
298 }
299 }
300
301 for obj in log.objects.values() {
302 *object_type_counts
303 .entry(obj.object_type.clone())
304 .or_insert(0) += 1;
305 }
306
307 let avg_events_per_object = if !object_event_counts.is_empty() {
308 object_event_counts.values().sum::<u64>() as f64 / object_event_counts.len() as f64
309 } else {
310 0.0
311 };
312
313 let avg_objects_per_activity = if !activity_object_counts.is_empty() {
314 activity_object_counts
315 .values()
316 .map(|s| s.len() as f64)
317 .sum::<f64>()
318 / activity_object_counts.len() as f64
319 } else {
320 0.0
321 };
322
323 let max_objects_per_event = log
324 .events
325 .iter()
326 .map(|e| e.objects.len())
327 .max()
328 .unwrap_or(0);
329
330 ObjectFlowMetrics {
331 object_count: log.objects.len(),
332 event_count: log.events.len(),
333 object_type_count: object_type_counts.len(),
334 avg_events_per_object,
335 avg_objects_per_activity,
336 max_objects_per_event,
337 object_type_distribution: object_type_counts,
338 }
339 }
340
341 pub fn detect_batching_patterns(log: &OCPMEventLog) -> Vec<OCPMPatternResult> {
343 let mut patterns = Vec::new();
344 let mut activity_batches: HashMap<String, Vec<HashSet<String>>> = HashMap::new();
345
346 let mut sorted_events: Vec<_> = log.events.iter().collect();
348 sorted_events.sort_by_key(|e| (e.activity.clone(), e.timestamp));
349
350 let mut current_activity = String::new();
351 let mut current_batch: HashSet<String> = HashSet::new();
352 let mut batch_events: Vec<u64> = Vec::new();
353 let mut last_timestamp = 0u64;
354
355 for event in sorted_events {
356 if event.activity != current_activity || event.timestamp > last_timestamp + 1000 {
357 if current_batch.len() >= 3 {
359 activity_batches
360 .entry(current_activity.clone())
361 .or_default()
362 .push(current_batch.clone());
363
364 patterns.push(OCPMPatternResult {
365 pattern_name: format!("{}_batch", current_activity),
366 matched_objects: current_batch.iter().cloned().collect(),
367 matched_events: batch_events.clone(),
368 score: current_batch.len() as f64 / 10.0,
369 description: format!(
370 "Batch of {} objects in activity '{}'",
371 current_batch.len(),
372 current_activity
373 ),
374 });
375 }
376
377 current_activity = event.activity.clone();
378 current_batch.clear();
379 batch_events.clear();
380 }
381
382 current_batch.extend(event.objects.iter().cloned());
383 batch_events.push(event.id);
384 last_timestamp = event.timestamp;
385 }
386
387 if current_batch.len() >= 3 {
389 patterns.push(OCPMPatternResult {
390 pattern_name: format!("{}_batch", current_activity),
391 matched_objects: current_batch.iter().cloned().collect(),
392 matched_events: batch_events,
393 score: current_batch.len() as f64 / 10.0,
394 description: format!(
395 "Batch of {} objects in activity '{}'",
396 current_batch.len(),
397 current_activity
398 ),
399 });
400 }
401
402 patterns
403 }
404}
405
406impl GpuKernel for OCPMPatternMatching {
407 fn metadata(&self) -> &KernelMetadata {
408 &self.metadata
409 }
410}
411
412#[derive(Debug, Clone)]
414pub struct ObjectFlowMetrics {
415 pub object_count: usize,
417 pub event_count: usize,
419 pub object_type_count: usize,
421 pub avg_events_per_object: f64,
423 pub avg_objects_per_activity: f64,
425 pub max_objects_per_event: usize,
427 pub object_type_distribution: HashMap<String, u64>,
429}
430
431fn classify_lifecycle(sequence: &[&str]) -> String {
433 if sequence.is_empty() {
434 return "empty".to_string();
435 }
436
437 if sequence.len() == 1 {
439 return "single_event".to_string();
440 }
441
442 let first = sequence[0].to_lowercase();
444 let last = sequence[sequence.len() - 1].to_lowercase();
445
446 if (first.contains("create") || first.contains("start") || first.contains("init"))
447 && (last.contains("complete") || last.contains("end") || last.contains("close"))
448 {
449 return "full_lifecycle".to_string();
450 }
451
452 let unique: HashSet<_> = sequence.iter().collect();
454 if unique.len() < sequence.len() / 2 {
455 return "loop_heavy".to_string();
456 }
457
458 "sequential".to_string()
459}
460
461fn calculate_lifecycle_score(sequence: &[&str]) -> f64 {
463 if sequence.is_empty() {
464 return 0.0;
465 }
466
467 let has_start = sequence.iter().any(|s| {
468 let lower = s.to_lowercase();
469 lower.contains("create") || lower.contains("start") || lower.contains("init")
470 });
471
472 let has_end = sequence.iter().any(|s| {
473 let lower = s.to_lowercase();
474 lower.contains("complete") || lower.contains("end") || lower.contains("close")
475 });
476
477 let unique_ratio = sequence.iter().collect::<HashSet<_>>().len() as f64 / sequence.len() as f64;
478
479 let mut score = 0.5 * unique_ratio;
480 if has_start {
481 score += 0.25;
482 }
483 if has_end {
484 score += 0.25;
485 }
486
487 score
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::types::{OCPMEvent, OCPMObject};
494
495 fn create_test_ocpm_log() -> OCPMEventLog {
496 let mut log = OCPMEventLog::new();
497
498 log.add_object(OCPMObject {
500 id: "order1".to_string(),
501 object_type: "Order".to_string(),
502 attributes: HashMap::new(),
503 });
504 log.add_object(OCPMObject {
505 id: "order2".to_string(),
506 object_type: "Order".to_string(),
507 attributes: HashMap::new(),
508 });
509 log.add_object(OCPMObject {
510 id: "item1".to_string(),
511 object_type: "Item".to_string(),
512 attributes: HashMap::new(),
513 });
514 log.add_object(OCPMObject {
515 id: "item2".to_string(),
516 object_type: "Item".to_string(),
517 attributes: HashMap::new(),
518 });
519 log.add_object(OCPMObject {
520 id: "item3".to_string(),
521 object_type: "Item".to_string(),
522 attributes: HashMap::new(),
523 });
524
525 log.add_event(OCPMEvent {
527 id: 1,
528 activity: "Create Order".to_string(),
529 timestamp: 1000,
530 objects: vec!["order1".to_string()],
531 attributes: HashMap::new(),
532 });
533 log.add_event(OCPMEvent {
534 id: 2,
535 activity: "Add Item".to_string(),
536 timestamp: 2000,
537 objects: vec!["order1".to_string(), "item1".to_string()],
538 attributes: HashMap::new(),
539 });
540 log.add_event(OCPMEvent {
541 id: 3,
542 activity: "Add Item".to_string(),
543 timestamp: 2100,
544 objects: vec!["order1".to_string(), "item2".to_string()],
545 attributes: HashMap::new(),
546 });
547 log.add_event(OCPMEvent {
548 id: 4,
549 activity: "Process Payment".to_string(),
550 timestamp: 3000,
551 objects: vec!["order1".to_string(), "order2".to_string()],
552 attributes: HashMap::new(),
553 });
554 log.add_event(OCPMEvent {
555 id: 5,
556 activity: "Complete Order".to_string(),
557 timestamp: 4000,
558 objects: vec!["order1".to_string()],
559 attributes: HashMap::new(),
560 });
561
562 log
563 }
564
565 #[test]
566 fn test_ocpm_metadata() {
567 let kernel = OCPMPatternMatching::new();
568 assert_eq!(kernel.metadata().id, "procint/ocpm-patterns");
569 assert_eq!(kernel.metadata().domain, Domain::ProcessIntelligence);
570 }
571
572 #[test]
573 fn test_lifecycle_detection() {
574 let log = create_test_ocpm_log();
575
576 let patterns = OCPMPatternMatching::detect_lifecycle_patterns(&log, "Order");
577
578 assert!(!patterns.is_empty());
579
580 let order1_pattern = patterns
582 .iter()
583 .find(|p| p.matched_objects.contains(&"order1".to_string()));
584 assert!(order1_pattern.is_some());
585 }
586
587 #[test]
588 fn test_interaction_detection() {
589 let log = create_test_ocpm_log();
590
591 let patterns = OCPMPatternMatching::detect_interaction_patterns(&log);
592
593 let has_order_item = patterns.iter().any(|p| {
595 p.matched_objects.contains(&"order1".to_string())
596 && (p.matched_objects.contains(&"item1".to_string())
597 || p.matched_objects.contains(&"item2".to_string()))
598 });
599
600 assert!(patterns.is_empty() || has_order_item);
602 }
603
604 #[test]
605 fn test_flow_metrics() {
606 let log = create_test_ocpm_log();
607
608 let metrics = OCPMPatternMatching::calculate_flow_metrics(&log);
609
610 assert_eq!(metrics.object_count, 5);
611 assert_eq!(metrics.event_count, 5);
612 assert_eq!(metrics.object_type_count, 2);
613 assert!(metrics.avg_events_per_object > 0.0);
614 assert!(metrics.max_objects_per_event >= 2);
615 }
616
617 #[test]
618 fn test_convergence_detection() {
619 let mut log = OCPMEventLog::new();
620
621 for i in 1..=4 {
623 log.add_object(OCPMObject {
624 id: format!("obj{}", i),
625 object_type: "Item".to_string(),
626 attributes: HashMap::new(),
627 });
628 }
629
630 log.add_event(OCPMEvent {
631 id: 1,
632 activity: "Merge".to_string(),
633 timestamp: 1000,
634 objects: vec![
635 "obj1".to_string(),
636 "obj2".to_string(),
637 "obj3".to_string(),
638 "obj4".to_string(),
639 ],
640 attributes: HashMap::new(),
641 });
642
643 let patterns = OCPMPatternMatching::detect_convergence_patterns(&log);
644
645 assert!(!patterns.is_empty());
646 assert!(patterns[0].matched_objects.len() >= 3);
647 }
648
649 #[test]
650 fn test_divergence_detection() {
651 let mut log = OCPMEventLog::new();
652
653 for i in 1..=3 {
655 log.add_object(OCPMObject {
656 id: format!("new{}", i),
657 object_type: "Product".to_string(),
658 attributes: HashMap::new(),
659 });
660 }
661
662 log.add_event(OCPMEvent {
663 id: 1,
664 activity: "Split".to_string(),
665 timestamp: 1000,
666 objects: vec!["new1".to_string(), "new2".to_string(), "new3".to_string()],
667 attributes: HashMap::new(),
668 });
669
670 let patterns = OCPMPatternMatching::detect_divergence_patterns(&log);
671
672 assert!(!patterns.is_empty());
673 assert_eq!(patterns[0].pattern_name, "divergence");
674 }
675
676 #[test]
677 fn test_sync_patterns() {
678 let mut log = OCPMEventLog::new();
679
680 log.add_object(OCPMObject {
682 id: "shared".to_string(),
683 object_type: "Resource".to_string(),
684 attributes: HashMap::new(),
685 });
686
687 for i in 0..5 {
689 log.add_event(OCPMEvent {
690 id: i,
691 activity: format!("Process_{}", i),
692 timestamp: 1000 + i * 100, objects: vec!["shared".to_string()],
694 attributes: HashMap::new(),
695 });
696 }
697
698 let patterns = OCPMPatternMatching::detect_sync_patterns(&log, 500);
699
700 assert!(!patterns.is_empty());
701 }
702
703 #[test]
704 fn test_batching_detection() {
705 let mut log = OCPMEventLog::new();
706
707 for i in 1..=5 {
709 log.add_object(OCPMObject {
710 id: format!("batch_item{}", i),
711 object_type: "Item".to_string(),
712 attributes: HashMap::new(),
713 });
714 }
715
716 for i in 1..=5 {
718 log.add_event(OCPMEvent {
719 id: i,
720 activity: "BatchProcess".to_string(),
721 timestamp: 1000 + i * 100,
722 objects: vec![format!("batch_item{}", i)],
723 attributes: HashMap::new(),
724 });
725 }
726
727 let patterns = OCPMPatternMatching::detect_batching_patterns(&log);
728
729 assert!(!patterns.is_empty());
731 assert!(patterns[0].pattern_name.contains("batch"));
732 }
733
734 #[test]
735 fn test_empty_log() {
736 let log = OCPMEventLog::new();
737
738 let lifecycle = OCPMPatternMatching::detect_lifecycle_patterns(&log, "Order");
739 assert!(lifecycle.is_empty());
740
741 let metrics = OCPMPatternMatching::calculate_flow_metrics(&log);
742 assert_eq!(metrics.object_count, 0);
743 assert_eq!(metrics.event_count, 0);
744 }
745
746 #[test]
747 fn test_lifecycle_classification() {
748 let full = classify_lifecycle(&["Create Order", "Process", "Complete Order"]);
750 assert_eq!(full, "full_lifecycle");
751
752 let seq = classify_lifecycle(&["A", "B", "C", "D"]);
754 assert_eq!(seq, "sequential");
755
756 let loops = classify_lifecycle(&["A", "B", "A", "B", "A", "B"]);
758 assert_eq!(loops, "loop_heavy");
759
760 let empty = classify_lifecycle(&[]);
762 assert_eq!(empty, "empty");
763 }
764
765 #[test]
766 fn test_lifecycle_score() {
767 let full_score = calculate_lifecycle_score(&["start", "process", "end"]);
769 assert!(full_score >= 0.75);
770
771 let mid_score = calculate_lifecycle_score(&["A", "B", "C"]);
773 assert!(mid_score < 0.75);
774 }
775}