Skip to main content

rust_rule_engine/engine/
dependency.rs

1#![allow(deprecated)]
2
3use crate::engine::rule::Rule;
4use std::collections::{HashMap, HashSet};
5
6/// Dependency analysis for safe parallel execution
7#[derive(Debug, Clone)]
8pub struct DependencyAnalyzer {
9    /// Rules that read from specific fields
10    readers: HashMap<String, Vec<String>>, // field -> rule_names
11    /// Rules that write to specific fields  
12    writers: HashMap<String, Vec<String>>, // field -> rule_names
13    /// Dependency graph: rule -> rules it depends on
14    dependencies: HashMap<String, HashSet<String>>,
15}
16
17impl Default for DependencyAnalyzer {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl DependencyAnalyzer {
24    /// Create new dependency analyzer
25    pub fn new() -> Self {
26        Self {
27            readers: HashMap::new(),
28            writers: HashMap::new(),
29            dependencies: HashMap::new(),
30        }
31    }
32
33    /// Analyze dependencies in a set of rules
34    pub fn analyze(&mut self, rules: &[Rule]) -> DependencyAnalysisResult {
35        self.clear();
36
37        // First pass: identify all reads and writes
38        for rule in rules {
39            self.analyze_rule_io(rule);
40        }
41
42        // Second pass: build dependency graph
43        self.build_dependency_graph();
44
45        // Third pass: identify conflicts
46        let conflicts = self.find_conflicts(rules);
47
48        // Fourth pass: group rules for safe parallel execution
49        let execution_groups = self.create_execution_groups(rules);
50        let conflicts_len = conflicts.len();
51
52        DependencyAnalysisResult {
53            total_rules: rules.len(),
54            conflicts: conflicts_len,
55            conflict_details: conflicts,
56            execution_groups,
57            can_parallelize_safely: conflicts_len == 0,
58        }
59    }
60
61    /// Clear previous analysis
62    fn clear(&mut self) {
63        self.readers.clear();
64        self.writers.clear();
65        self.dependencies.clear();
66    }
67
68    /// Analyze what fields a rule reads from and writes to
69    fn analyze_rule_io(&mut self, rule: &Rule) {
70        // Analyze condition reads
71        let condition_reads = self.extract_condition_reads(rule);
72        for field in condition_reads {
73            self.readers
74                .entry(field)
75                .or_default()
76                .push(rule.name.clone());
77        }
78
79        // Analyze action writes
80        let action_writes = self.extract_action_writes(rule);
81        for field in action_writes {
82            self.writers
83                .entry(field)
84                .or_default()
85                .push(rule.name.clone());
86        }
87    }
88
89    /// Extract field reads from rule conditions (proper implementation)
90    fn extract_condition_reads(&self, rule: &Rule) -> Vec<String> {
91        let mut reads = Vec::new();
92
93        // Extract from actual condition structure
94        Self::extract_fields_from_condition_group(&rule.conditions, &mut reads);
95
96        reads
97    }
98
99    /// Recursively extract fields from condition groups
100    fn extract_fields_from_condition_group(
101        condition_group: &crate::engine::rule::ConditionGroup,
102        reads: &mut Vec<String>,
103    ) {
104        match condition_group {
105            crate::engine::rule::ConditionGroup::Single(condition) => {
106                reads.push(condition.field.clone());
107            }
108            crate::engine::rule::ConditionGroup::Compound { left, right, .. } => {
109                Self::extract_fields_from_condition_group(left, reads);
110                Self::extract_fields_from_condition_group(right, reads);
111            }
112            crate::engine::rule::ConditionGroup::Not(inner) => {
113                Self::extract_fields_from_condition_group(inner, reads);
114            }
115            crate::engine::rule::ConditionGroup::Exists(inner) => {
116                // For EXISTS, we're reading the fields to check existence
117                Self::extract_fields_from_condition_group(inner, reads);
118            }
119            crate::engine::rule::ConditionGroup::Forall(inner) => {
120                // For FORALL, we're reading the fields to check all match
121                Self::extract_fields_from_condition_group(inner, reads);
122            }
123            crate::engine::rule::ConditionGroup::Accumulate {
124                source_pattern,
125                extract_field,
126                ..
127            } => {
128                // For ACCUMULATE, we're reading the source pattern and extract field
129                reads.push(format!("{}.{}", source_pattern, extract_field));
130            }
131
132            #[cfg(feature = "streaming")]
133            crate::engine::rule::ConditionGroup::StreamPattern {
134                stream_name,
135                event_type,
136                ..
137            } => {
138                // For STREAM patterns, we're reading from the stream
139                if let Some(event_type) = event_type {
140                    reads.push(format!("{}.{}", stream_name, event_type));
141                } else {
142                    reads.push(stream_name.clone());
143                }
144            }
145        }
146    }
147
148    /// Extract field writes from rule actions (proper implementation)
149    fn extract_action_writes(&self, rule: &Rule) -> Vec<String> {
150        let mut writes = Vec::new();
151
152        // Analyze actual actions to find field writes
153        for action in &rule.actions {
154            match action {
155                crate::types::ActionType::Set { field, .. } => {
156                    writes.push(field.clone());
157                }
158                crate::types::ActionType::Append { field, .. } => {
159                    writes.push(field.clone());
160                }
161                crate::types::ActionType::Retract { object } => {
162                    // Retract removes a fact, mark it as a write
163                    writes.push(format!("_retracted_{}", object));
164                }
165                crate::types::ActionType::MethodCall { object, method, .. } => {
166                    // Method calls might modify the object
167                    writes.push(object.clone());
168
169                    // Some methods have predictable side effects
170                    if method.contains("set")
171                        || method.contains("update")
172                        || method.contains("modify")
173                        || method.contains("change")
174                    {
175                        writes.push(format!("{}.{}", object, method));
176                    }
177                }
178                crate::types::ActionType::Custom {
179                    action_type,
180                    params,
181                } => {
182                    // Check if custom action has a target field parameter
183                    if let Some(crate::types::Value::String(field)) = params.get("target_field") {
184                        writes.push(field.clone());
185                    }
186
187                    // Analyze custom action type for side effects
188                    writes.extend(self.analyze_custom_action_side_effects(action_type, params));
189                }
190                // Log doesn't modify fields
191                crate::types::ActionType::Log { .. } => {}
192                // Workflow actions don't modify facts directly
193                crate::types::ActionType::ActivateAgendaGroup { .. } => {}
194                crate::types::ActionType::ScheduleRule { .. } => {}
195                crate::types::ActionType::CompleteWorkflow { .. } => {}
196                crate::types::ActionType::SetWorkflowData { .. } => {}
197            }
198        }
199
200        writes
201    }
202
203    /// Analyze function calls for potential field writes
204    #[allow(dead_code)]
205    fn analyze_function_side_effects(&self, function_name: &str) -> Vec<String> {
206        let mut side_effects = Vec::new();
207
208        // Pattern matching for common function naming conventions
209        if function_name.starts_with("set") || function_name.starts_with("update") {
210            // setUserScore, updateOrderTotal, etc.
211            if let Some(field) = self.extract_field_from_function_name(function_name) {
212                side_effects.push(field);
213            }
214        } else if function_name.starts_with("calculate") || function_name.starts_with("compute") {
215            // calculateScore, computeTotal, etc.
216            if let Some(field) = self.extract_field_from_function_name(function_name) {
217                side_effects.push(field);
218            }
219        } else if function_name.contains("modify") || function_name.contains("change") {
220            // modifyUser, changeStatus, etc.
221            if let Some(field) = self.extract_field_from_function_name(function_name) {
222                side_effects.push(field);
223            }
224        }
225
226        side_effects
227    }
228
229    /// Analyze custom actions for potential field writes
230    fn analyze_custom_action_side_effects(
231        &self,
232        action_type: &str,
233        params: &std::collections::HashMap<String, crate::types::Value>,
234    ) -> Vec<String> {
235        let mut side_effects = Vec::new();
236
237        // Check for common parameter names that indicate field modification
238        for (key, value) in params {
239            if key == "field" || key == "target" || key == "output_field" {
240                if let crate::types::Value::String(field_name) = value {
241                    side_effects.push(field_name.clone());
242                }
243            }
244        }
245
246        // Pattern matching on action type
247        if action_type.contains("set")
248            || action_type.contains("update")
249            || action_type.contains("modify")
250            || action_type.contains("calculate")
251        {
252            // Extract potential field from action type name
253            if let Some(field) = self.extract_field_from_function_name(action_type) {
254                side_effects.push(field);
255            }
256        }
257
258        side_effects
259    }
260
261    /// Extract field name from function/action name using common patterns
262    fn extract_field_from_function_name(&self, name: &str) -> Option<String> {
263        // Convert camelCase/PascalCase to dot notation
264        // setUserScore -> User.Score
265        // calculateOrderTotal -> Order.Total
266        // updateVIPStatus -> VIP.Status
267
268        let name = name
269            .trim_start_matches("set")
270            .trim_start_matches("update")
271            .trim_start_matches("calculate")
272            .trim_start_matches("compute")
273            .trim_start_matches("modify")
274            .trim_start_matches("change");
275
276        // Simple pattern matching for common field patterns
277        if name.contains("User") && name.contains("Score") {
278            Some("User.Score".to_string())
279        } else if name.contains("User") && name.contains("VIP") {
280            Some("User.IsVIP".to_string())
281        } else if name.contains("Order") && name.contains("Total") {
282            Some("Order.Total".to_string())
283        } else if name.contains("Order") && name.contains("Amount") {
284            Some("Order.Amount".to_string())
285        } else if name.contains("Discount") {
286            Some("Order.DiscountRate".to_string())
287        } else {
288            // Generic field extraction from camelCase
289            self.convert_camel_case_to_field(name)
290        }
291    }
292
293    /// Convert camelCase to potential field name
294    fn convert_camel_case_to_field(&self, name: &str) -> Option<String> {
295        if name.is_empty() {
296            return None;
297        }
298
299        let mut result = String::new();
300        let chars = name.chars().peekable();
301
302        for c in chars {
303            if c.is_uppercase() && !result.is_empty() {
304                result.push('.');
305            }
306            result.push(c);
307        }
308
309        if result.contains('.') {
310            Some(result)
311        } else {
312            None
313        }
314    }
315
316    /// Build dependency graph based on read/write analysis
317    fn build_dependency_graph(&mut self) {
318        for (field, readers) in &self.readers {
319            if let Some(writers) = self.writers.get(field) {
320                // If rule A writes to field X and rule B reads from field X,
321                // then rule B depends on rule A
322                for reader in readers {
323                    for writer in writers {
324                        if reader != writer {
325                            self.dependencies
326                                .entry(reader.clone())
327                                .or_default()
328                                .insert(writer.clone());
329                        }
330                    }
331                }
332            }
333        }
334    }
335
336    /// Find rules that have conflicts (read/write or write/write to same field)
337    fn find_conflicts(&self, rules: &[Rule]) -> Vec<DependencyConflict> {
338        let mut conflicts = Vec::new();
339
340        // Group rules by salience
341        let mut salience_groups: HashMap<i32, Vec<&Rule>> = HashMap::new();
342        for rule in rules {
343            salience_groups.entry(rule.salience).or_default().push(rule);
344        }
345
346        // Check for conflicts within each salience group
347        for (salience, group_rules) in salience_groups {
348            if group_rules.len() <= 1 {
349                continue; // No conflicts possible with single rule
350            }
351
352            // Check for write-write conflicts
353            let mut field_writers: HashMap<String, Vec<String>> = HashMap::new();
354            for rule in &group_rules {
355                let writes = self.extract_action_writes(rule);
356                for field in writes {
357                    field_writers
358                        .entry(field)
359                        .or_default()
360                        .push(rule.name.clone());
361                }
362            }
363
364            for (field, writers) in field_writers {
365                if writers.len() > 1 {
366                    conflicts.push(DependencyConflict {
367                        conflict_type: ConflictType::WriteWrite,
368                        field: field.clone(),
369                        rules: writers,
370                        salience,
371                        description: format!("Multiple rules write to {}", field),
372                    });
373                }
374            }
375
376            // Check for read-write conflicts
377            for rule in &group_rules {
378                let reads = self.extract_condition_reads(rule);
379                for field in &reads {
380                    if let Some(writers) = self.writers.get(field) {
381                        let conflicting_writers: Vec<String> = writers
382                            .iter()
383                            .filter(|writer| {
384                                group_rules
385                                    .iter()
386                                    .any(|r| r.name == **writer && r.name != rule.name)
387                            })
388                            .cloned()
389                            .collect();
390
391                        if !conflicting_writers.is_empty() {
392                            let mut involved_rules = conflicting_writers.clone();
393                            involved_rules.push(rule.name.clone());
394
395                            conflicts.push(DependencyConflict {
396                                conflict_type: ConflictType::ReadWrite,
397                                field: field.clone(),
398                                rules: involved_rules,
399                                salience,
400                                description: format!(
401                                    "Rule {} reads {} while others write to it",
402                                    rule.name, field
403                                ),
404                            });
405                        }
406                    }
407                }
408            }
409        }
410
411        conflicts
412    }
413
414    /// Create execution groups for safe parallel execution
415    fn create_execution_groups(&self, rules: &[Rule]) -> Vec<ExecutionGroup> {
416        let mut groups = Vec::new();
417
418        // Group by salience first
419        let mut salience_groups: HashMap<i32, Vec<Rule>> = HashMap::new();
420        for rule in rules {
421            salience_groups
422                .entry(rule.salience)
423                .or_default()
424                .push(rule.clone());
425        }
426
427        // Process each salience level
428        let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
429        salience_levels.sort_by(|a, b| b.cmp(a)); // Descending order
430
431        for salience in salience_levels {
432            let rules_at_level = &salience_groups[&salience];
433
434            if rules_at_level.len() == 1 {
435                // Single rule - always safe
436                groups.push(ExecutionGroup {
437                    rules: rules_at_level.clone(),
438                    execution_mode: ExecutionMode::Sequential,
439                    salience,
440                    can_parallelize: false,
441                    conflicts: Vec::new(),
442                });
443            } else {
444                // Multiple rules - check for conflicts
445                let conflicts = self.find_conflicts(rules_at_level);
446                let can_parallelize = conflicts.is_empty();
447
448                groups.push(ExecutionGroup {
449                    rules: rules_at_level.clone(),
450                    execution_mode: if can_parallelize {
451                        ExecutionMode::Parallel
452                    } else {
453                        ExecutionMode::Sequential
454                    },
455                    salience,
456                    can_parallelize,
457                    conflicts,
458                });
459            }
460        }
461
462        groups
463    }
464}
465
466/// Result of dependency analysis
467#[derive(Debug, Clone)]
468pub struct DependencyAnalysisResult {
469    /// Total number of rules analyzed
470    pub total_rules: usize,
471    /// Number of conflicts found
472    pub conflicts: usize,
473    /// Detailed conflict information
474    pub conflict_details: Vec<DependencyConflict>,
475    /// Recommended execution groups
476    pub execution_groups: Vec<ExecutionGroup>,
477    /// Whether rules can be safely parallelized
478    pub can_parallelize_safely: bool,
479}
480
481/// A conflict between rules
482#[derive(Debug, Clone)]
483pub struct DependencyConflict {
484    /// Type of conflict
485    pub conflict_type: ConflictType,
486    /// Field that causes the conflict
487    pub field: String,
488    /// Rules involved in the conflict
489    pub rules: Vec<String>,
490    /// Salience level where conflict occurs
491    pub salience: i32,
492    /// Human-readable description
493    pub description: String,
494}
495
496/// Type of dependency conflict
497#[derive(Debug, Clone, PartialEq)]
498pub enum ConflictType {
499    /// Multiple rules write to the same field
500    WriteWrite,
501    /// One rule reads while another writes to the same field
502    ReadWrite,
503    /// Circular dependency
504    Circular,
505}
506
507/// Execution group with parallelization recommendation
508#[derive(Debug, Clone)]
509pub struct ExecutionGroup {
510    /// Rules in this group
511    pub rules: Vec<Rule>,
512    /// Recommended execution mode
513    pub execution_mode: ExecutionMode,
514    /// Salience level
515    pub salience: i32,
516    /// Whether this group can be safely parallelized
517    pub can_parallelize: bool,
518    /// Conflicts preventing parallelization
519    pub conflicts: Vec<DependencyConflict>,
520}
521
522/// Execution mode recommendation
523#[derive(Debug, Clone, PartialEq)]
524pub enum ExecutionMode {
525    /// Safe to run in parallel
526    Parallel,
527    /// Must run sequentially due to dependencies
528    Sequential,
529}
530
531/// Strategy used for execution
532#[derive(Debug, Clone, PartialEq)]
533pub enum ExecutionStrategy {
534    /// All rules executed sequentially (due to dependencies)
535    FullSequential,
536    /// All rules executed in parallel (no dependencies)
537    FullParallel,
538    /// Mixed execution (some parallel, some sequential)
539    Hybrid,
540    /// Forced sequential due to configuration
541    ForcedSequential,
542}
543
544impl DependencyAnalysisResult {
545    /// Get a summary report
546    pub fn get_summary(&self) -> String {
547        format!(
548            "šŸ“Š Dependency Analysis Summary:\n   Total rules: {}\n   Conflicts found: {}\n   Safe for parallel: {}\n   Execution groups: {}",
549            self.total_rules,
550            self.conflicts,
551            if self.can_parallelize_safely { "āœ… Yes" } else { "āŒ No" },
552            self.execution_groups.len()
553        )
554    }
555
556    /// Get detailed report
557    pub fn get_detailed_report(&self) -> String {
558        let mut report = self.get_summary();
559        report.push_str("\n\nšŸ” Detailed Analysis:");
560
561        for (i, group) in self.execution_groups.iter().enumerate() {
562            report.push_str(&format!(
563                "\n\nšŸ“‹ Group {} (Salience {}):",
564                i + 1,
565                group.salience
566            ));
567            report.push_str(&format!(
568                "\n   Mode: {:?} | Can parallelize: {}",
569                group.execution_mode,
570                if group.can_parallelize { "āœ…" } else { "āŒ" }
571            ));
572            report.push_str(&format!(
573                "\n   Rules: {}",
574                group
575                    .rules
576                    .iter()
577                    .map(|r| r.name.as_str())
578                    .collect::<Vec<_>>()
579                    .join(", ")
580            ));
581
582            if !group.conflicts.is_empty() {
583                report.push_str("\n   🚨 Conflicts:");
584                for conflict in &group.conflicts {
585                    report.push_str(&format!(
586                        "\n      - {}: {} (rules: {})",
587                        match conflict.conflict_type {
588                            ConflictType::WriteWrite => "Write-Write",
589                            ConflictType::ReadWrite => "Read-Write",
590                            ConflictType::Circular => "Circular",
591                        },
592                        conflict.field,
593                        conflict.rules.join(", ")
594                    ));
595                }
596            }
597        }
598
599        report
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use crate::engine::rule::{Condition, ConditionGroup};
607
608    #[test]
609    fn test_dependency_analyzer_creation() {
610        let analyzer = DependencyAnalyzer::new();
611        assert!(analyzer.readers.is_empty());
612        assert!(analyzer.writers.is_empty());
613        assert!(analyzer.dependencies.is_empty());
614    }
615
616    #[test]
617    fn test_safe_rules_analysis() {
618        let mut analyzer = DependencyAnalyzer::new();
619
620        let rules = vec![
621            Rule::new(
622                "AgeValidation".to_string(),
623                ConditionGroup::Single(Condition::new(
624                    "User.Age".to_string(),
625                    crate::types::Operator::GreaterThan,
626                    crate::types::Value::Integer(18),
627                )),
628                vec![],
629            ),
630            Rule::new(
631                "CountryCheck".to_string(),
632                ConditionGroup::Single(Condition::new(
633                    "User.Country".to_string(),
634                    crate::types::Operator::Equal,
635                    crate::types::Value::String("US".to_string()),
636                )),
637                vec![],
638            ),
639        ];
640
641        let result = analyzer.analyze(&rules);
642        assert_eq!(result.total_rules, 2);
643        assert_eq!(result.conflicts, 0);
644        assert!(result.can_parallelize_safely);
645    }
646
647    #[test]
648    fn test_conflicting_rules_analysis() {
649        let mut analyzer = DependencyAnalyzer::new();
650
651        let rules = vec![
652            Rule::new(
653                "CalculateScore".to_string(),
654                ConditionGroup::Single(Condition::new(
655                    "User.Data".to_string(),
656                    crate::types::Operator::Equal,
657                    crate::types::Value::String("valid".to_string()),
658                )),
659                vec![crate::types::ActionType::Set {
660                    field: "User.Score".to_string(),
661                    value: crate::types::Value::Integer(85),
662                }],
663            ),
664            Rule::new(
665                "CheckVIPStatus".to_string(),
666                ConditionGroup::Single(Condition::new(
667                    "User.Score".to_string(),
668                    crate::types::Operator::GreaterThan,
669                    crate::types::Value::Integer(80),
670                )),
671                vec![crate::types::ActionType::Set {
672                    field: "User.IsVIP".to_string(),
673                    value: crate::types::Value::Boolean(true),
674                }],
675            ),
676        ];
677
678        let result = analyzer.analyze(&rules);
679        assert_eq!(result.total_rules, 2);
680        // Should detect conflicts between score calculation and VIP check
681        assert!(!result.can_parallelize_safely);
682    }
683}