1#![allow(deprecated)]
2
3use crate::engine::rule::Rule;
4use std::collections::{HashMap, HashSet};
5
6#[derive(Debug, Clone)]
8pub struct DependencyAnalyzer {
9 readers: HashMap<String, Vec<String>>, writers: HashMap<String, Vec<String>>, dependencies: HashMap<String, HashSet<String>>,
15}
16
17impl Default for DependencyAnalyzer {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl DependencyAnalyzer {
24 pub fn new() -> Self {
26 Self {
27 readers: HashMap::new(),
28 writers: HashMap::new(),
29 dependencies: HashMap::new(),
30 }
31 }
32
33 pub fn analyze(&mut self, rules: &[Rule]) -> DependencyAnalysisResult {
35 self.clear();
36
37 for rule in rules {
39 self.analyze_rule_io(rule);
40 }
41
42 self.build_dependency_graph();
44
45 let conflicts = self.find_conflicts(rules);
47
48 let execution_groups = self.create_execution_groups(rules);
50 let conflicts_len = conflicts.len();
51
52 DependencyAnalysisResult {
53 total_rules: rules.len(),
54 conflicts: conflicts_len,
55 conflict_details: conflicts,
56 execution_groups,
57 can_parallelize_safely: conflicts_len == 0,
58 }
59 }
60
61 fn clear(&mut self) {
63 self.readers.clear();
64 self.writers.clear();
65 self.dependencies.clear();
66 }
67
68 fn analyze_rule_io(&mut self, rule: &Rule) {
70 let condition_reads = self.extract_condition_reads(rule);
72 for field in condition_reads {
73 self.readers
74 .entry(field)
75 .or_default()
76 .push(rule.name.clone());
77 }
78
79 let action_writes = self.extract_action_writes(rule);
81 for field in action_writes {
82 self.writers
83 .entry(field)
84 .or_default()
85 .push(rule.name.clone());
86 }
87 }
88
89 fn extract_condition_reads(&self, rule: &Rule) -> Vec<String> {
91 let mut reads = Vec::new();
92
93 Self::extract_fields_from_condition_group(&rule.conditions, &mut reads);
95
96 reads
97 }
98
99 fn extract_fields_from_condition_group(
101 condition_group: &crate::engine::rule::ConditionGroup,
102 reads: &mut Vec<String>,
103 ) {
104 match condition_group {
105 crate::engine::rule::ConditionGroup::Single(condition) => {
106 reads.push(condition.field.clone());
107 }
108 crate::engine::rule::ConditionGroup::Compound { left, right, .. } => {
109 Self::extract_fields_from_condition_group(left, reads);
110 Self::extract_fields_from_condition_group(right, reads);
111 }
112 crate::engine::rule::ConditionGroup::Not(inner) => {
113 Self::extract_fields_from_condition_group(inner, reads);
114 }
115 crate::engine::rule::ConditionGroup::Exists(inner) => {
116 Self::extract_fields_from_condition_group(inner, reads);
118 }
119 crate::engine::rule::ConditionGroup::Forall(inner) => {
120 Self::extract_fields_from_condition_group(inner, reads);
122 }
123 crate::engine::rule::ConditionGroup::Accumulate {
124 source_pattern,
125 extract_field,
126 ..
127 } => {
128 reads.push(format!("{}.{}", source_pattern, extract_field));
130 }
131
132 #[cfg(feature = "streaming")]
133 crate::engine::rule::ConditionGroup::StreamPattern {
134 stream_name,
135 event_type,
136 ..
137 } => {
138 if let Some(event_type) = event_type {
140 reads.push(format!("{}.{}", stream_name, event_type));
141 } else {
142 reads.push(stream_name.clone());
143 }
144 }
145 }
146 }
147
148 fn extract_action_writes(&self, rule: &Rule) -> Vec<String> {
150 let mut writes = Vec::new();
151
152 for action in &rule.actions {
154 match action {
155 crate::types::ActionType::Set { field, .. } => {
156 writes.push(field.clone());
157 }
158 crate::types::ActionType::Retract { object } => {
159 writes.push(format!("_retracted_{}", object));
161 }
162 crate::types::ActionType::MethodCall { object, method, .. } => {
163 writes.push(object.clone());
165
166 if method.contains("set")
168 || method.contains("update")
169 || method.contains("modify")
170 || method.contains("change")
171 {
172 writes.push(format!("{}.{}", object, method));
173 }
174 }
175 crate::types::ActionType::Custom {
176 action_type,
177 params,
178 } => {
179 if let Some(crate::types::Value::String(field)) = params.get("target_field") {
181 writes.push(field.clone());
182 }
183
184 writes.extend(self.analyze_custom_action_side_effects(action_type, params));
186 }
187 crate::types::ActionType::Log { .. } => {}
189 crate::types::ActionType::ActivateAgendaGroup { .. } => {}
191 crate::types::ActionType::ScheduleRule { .. } => {}
192 crate::types::ActionType::CompleteWorkflow { .. } => {}
193 crate::types::ActionType::SetWorkflowData { .. } => {}
194 }
195 }
196
197 writes
198 }
199
200 #[allow(dead_code)]
202 fn analyze_function_side_effects(&self, function_name: &str) -> Vec<String> {
203 let mut side_effects = Vec::new();
204
205 if function_name.starts_with("set") || function_name.starts_with("update") {
207 if let Some(field) = self.extract_field_from_function_name(function_name) {
209 side_effects.push(field);
210 }
211 } else if function_name.starts_with("calculate") || function_name.starts_with("compute") {
212 if let Some(field) = self.extract_field_from_function_name(function_name) {
214 side_effects.push(field);
215 }
216 } else if function_name.contains("modify") || function_name.contains("change") {
217 if let Some(field) = self.extract_field_from_function_name(function_name) {
219 side_effects.push(field);
220 }
221 }
222
223 side_effects
224 }
225
226 fn analyze_custom_action_side_effects(
228 &self,
229 action_type: &str,
230 params: &std::collections::HashMap<String, crate::types::Value>,
231 ) -> Vec<String> {
232 let mut side_effects = Vec::new();
233
234 for (key, value) in params {
236 if key == "field" || key == "target" || key == "output_field" {
237 if let crate::types::Value::String(field_name) = value {
238 side_effects.push(field_name.clone());
239 }
240 }
241 }
242
243 if action_type.contains("set")
245 || action_type.contains("update")
246 || action_type.contains("modify")
247 || action_type.contains("calculate")
248 {
249 if let Some(field) = self.extract_field_from_function_name(action_type) {
251 side_effects.push(field);
252 }
253 }
254
255 side_effects
256 }
257
258 fn extract_field_from_function_name(&self, name: &str) -> Option<String> {
260 let name = name
266 .trim_start_matches("set")
267 .trim_start_matches("update")
268 .trim_start_matches("calculate")
269 .trim_start_matches("compute")
270 .trim_start_matches("modify")
271 .trim_start_matches("change");
272
273 if name.contains("User") && name.contains("Score") {
275 Some("User.Score".to_string())
276 } else if name.contains("User") && name.contains("VIP") {
277 Some("User.IsVIP".to_string())
278 } else if name.contains("Order") && name.contains("Total") {
279 Some("Order.Total".to_string())
280 } else if name.contains("Order") && name.contains("Amount") {
281 Some("Order.Amount".to_string())
282 } else if name.contains("Discount") {
283 Some("Order.DiscountRate".to_string())
284 } else {
285 self.convert_camel_case_to_field(name)
287 }
288 }
289
290 fn convert_camel_case_to_field(&self, name: &str) -> Option<String> {
292 if name.is_empty() {
293 return None;
294 }
295
296 let mut result = String::new();
297 let chars = name.chars().peekable();
298
299 for c in chars {
300 if c.is_uppercase() && !result.is_empty() {
301 result.push('.');
302 }
303 result.push(c);
304 }
305
306 if result.contains('.') {
307 Some(result)
308 } else {
309 None
310 }
311 }
312
313 fn build_dependency_graph(&mut self) {
315 for (field, readers) in &self.readers {
316 if let Some(writers) = self.writers.get(field) {
317 for reader in readers {
320 for writer in writers {
321 if reader != writer {
322 self.dependencies
323 .entry(reader.clone())
324 .or_default()
325 .insert(writer.clone());
326 }
327 }
328 }
329 }
330 }
331 }
332
333 fn find_conflicts(&self, rules: &[Rule]) -> Vec<DependencyConflict> {
335 let mut conflicts = Vec::new();
336
337 let mut salience_groups: HashMap<i32, Vec<&Rule>> = HashMap::new();
339 for rule in rules {
340 salience_groups.entry(rule.salience).or_default().push(rule);
341 }
342
343 for (salience, group_rules) in salience_groups {
345 if group_rules.len() <= 1 {
346 continue; }
348
349 let mut field_writers: HashMap<String, Vec<String>> = HashMap::new();
351 for rule in &group_rules {
352 let writes = self.extract_action_writes(rule);
353 for field in writes {
354 field_writers
355 .entry(field)
356 .or_default()
357 .push(rule.name.clone());
358 }
359 }
360
361 for (field, writers) in field_writers {
362 if writers.len() > 1 {
363 conflicts.push(DependencyConflict {
364 conflict_type: ConflictType::WriteWrite,
365 field: field.clone(),
366 rules: writers,
367 salience,
368 description: format!("Multiple rules write to {}", field),
369 });
370 }
371 }
372
373 for rule in &group_rules {
375 let reads = self.extract_condition_reads(rule);
376 for field in &reads {
377 if let Some(writers) = self.writers.get(field) {
378 let conflicting_writers: Vec<String> = writers
379 .iter()
380 .filter(|writer| {
381 group_rules
382 .iter()
383 .any(|r| r.name == **writer && r.name != rule.name)
384 })
385 .cloned()
386 .collect();
387
388 if !conflicting_writers.is_empty() {
389 let mut involved_rules = conflicting_writers.clone();
390 involved_rules.push(rule.name.clone());
391
392 conflicts.push(DependencyConflict {
393 conflict_type: ConflictType::ReadWrite,
394 field: field.clone(),
395 rules: involved_rules,
396 salience,
397 description: format!(
398 "Rule {} reads {} while others write to it",
399 rule.name, field
400 ),
401 });
402 }
403 }
404 }
405 }
406 }
407
408 conflicts
409 }
410
411 fn create_execution_groups(&self, rules: &[Rule]) -> Vec<ExecutionGroup> {
413 let mut groups = Vec::new();
414
415 let mut salience_groups: HashMap<i32, Vec<Rule>> = HashMap::new();
417 for rule in rules {
418 salience_groups
419 .entry(rule.salience)
420 .or_default()
421 .push(rule.clone());
422 }
423
424 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
426 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
429 let rules_at_level = &salience_groups[&salience];
430
431 if rules_at_level.len() == 1 {
432 groups.push(ExecutionGroup {
434 rules: rules_at_level.clone(),
435 execution_mode: ExecutionMode::Sequential,
436 salience,
437 can_parallelize: false,
438 conflicts: Vec::new(),
439 });
440 } else {
441 let conflicts = self.find_conflicts(rules_at_level);
443 let can_parallelize = conflicts.is_empty();
444
445 groups.push(ExecutionGroup {
446 rules: rules_at_level.clone(),
447 execution_mode: if can_parallelize {
448 ExecutionMode::Parallel
449 } else {
450 ExecutionMode::Sequential
451 },
452 salience,
453 can_parallelize,
454 conflicts,
455 });
456 }
457 }
458
459 groups
460 }
461}
462
463#[derive(Debug, Clone)]
465pub struct DependencyAnalysisResult {
466 pub total_rules: usize,
468 pub conflicts: usize,
470 pub conflict_details: Vec<DependencyConflict>,
472 pub execution_groups: Vec<ExecutionGroup>,
474 pub can_parallelize_safely: bool,
476}
477
478#[derive(Debug, Clone)]
480pub struct DependencyConflict {
481 pub conflict_type: ConflictType,
483 pub field: String,
485 pub rules: Vec<String>,
487 pub salience: i32,
489 pub description: String,
491}
492
493#[derive(Debug, Clone, PartialEq)]
495pub enum ConflictType {
496 WriteWrite,
498 ReadWrite,
500 Circular,
502}
503
504#[derive(Debug, Clone)]
506pub struct ExecutionGroup {
507 pub rules: Vec<Rule>,
509 pub execution_mode: ExecutionMode,
511 pub salience: i32,
513 pub can_parallelize: bool,
515 pub conflicts: Vec<DependencyConflict>,
517}
518
519#[derive(Debug, Clone, PartialEq)]
521pub enum ExecutionMode {
522 Parallel,
524 Sequential,
526}
527
528#[derive(Debug, Clone, PartialEq)]
530pub enum ExecutionStrategy {
531 FullSequential,
533 FullParallel,
535 Hybrid,
537 ForcedSequential,
539}
540
541impl DependencyAnalysisResult {
542 pub fn get_summary(&self) -> String {
544 format!(
545 "š Dependency Analysis Summary:\n Total rules: {}\n Conflicts found: {}\n Safe for parallel: {}\n Execution groups: {}",
546 self.total_rules,
547 self.conflicts,
548 if self.can_parallelize_safely { "ā
Yes" } else { "ā No" },
549 self.execution_groups.len()
550 )
551 }
552
553 pub fn get_detailed_report(&self) -> String {
555 let mut report = self.get_summary();
556 report.push_str("\n\nš Detailed Analysis:");
557
558 for (i, group) in self.execution_groups.iter().enumerate() {
559 report.push_str(&format!(
560 "\n\nš Group {} (Salience {}):",
561 i + 1,
562 group.salience
563 ));
564 report.push_str(&format!(
565 "\n Mode: {:?} | Can parallelize: {}",
566 group.execution_mode,
567 if group.can_parallelize { "ā
" } else { "ā" }
568 ));
569 report.push_str(&format!(
570 "\n Rules: {}",
571 group
572 .rules
573 .iter()
574 .map(|r| r.name.as_str())
575 .collect::<Vec<_>>()
576 .join(", ")
577 ));
578
579 if !group.conflicts.is_empty() {
580 report.push_str("\n šØ Conflicts:");
581 for conflict in &group.conflicts {
582 report.push_str(&format!(
583 "\n - {}: {} (rules: {})",
584 match conflict.conflict_type {
585 ConflictType::WriteWrite => "Write-Write",
586 ConflictType::ReadWrite => "Read-Write",
587 ConflictType::Circular => "Circular",
588 },
589 conflict.field,
590 conflict.rules.join(", ")
591 ));
592 }
593 }
594 }
595
596 report
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603 use crate::engine::rule::{Condition, ConditionGroup};
604
605 #[test]
606 fn test_dependency_analyzer_creation() {
607 let analyzer = DependencyAnalyzer::new();
608 assert!(analyzer.readers.is_empty());
609 assert!(analyzer.writers.is_empty());
610 assert!(analyzer.dependencies.is_empty());
611 }
612
613 #[test]
614 fn test_safe_rules_analysis() {
615 let mut analyzer = DependencyAnalyzer::new();
616
617 let rules = vec![
618 Rule::new(
619 "AgeValidation".to_string(),
620 ConditionGroup::Single(Condition::new(
621 "User.Age".to_string(),
622 crate::types::Operator::GreaterThan,
623 crate::types::Value::Integer(18),
624 )),
625 vec![],
626 ),
627 Rule::new(
628 "CountryCheck".to_string(),
629 ConditionGroup::Single(Condition::new(
630 "User.Country".to_string(),
631 crate::types::Operator::Equal,
632 crate::types::Value::String("US".to_string()),
633 )),
634 vec![],
635 ),
636 ];
637
638 let result = analyzer.analyze(&rules);
639 assert_eq!(result.total_rules, 2);
640 assert_eq!(result.conflicts, 0);
641 assert!(result.can_parallelize_safely);
642 }
643
644 #[test]
645 fn test_conflicting_rules_analysis() {
646 let mut analyzer = DependencyAnalyzer::new();
647
648 let rules = vec![
649 Rule::new(
650 "CalculateScore".to_string(),
651 ConditionGroup::Single(Condition::new(
652 "User.Data".to_string(),
653 crate::types::Operator::Equal,
654 crate::types::Value::String("valid".to_string()),
655 )),
656 vec![crate::types::ActionType::Set {
657 field: "User.Score".to_string(),
658 value: crate::types::Value::Integer(85),
659 }],
660 ),
661 Rule::new(
662 "CheckVIPStatus".to_string(),
663 ConditionGroup::Single(Condition::new(
664 "User.Score".to_string(),
665 crate::types::Operator::GreaterThan,
666 crate::types::Value::Integer(80),
667 )),
668 vec![crate::types::ActionType::Set {
669 field: "User.IsVIP".to_string(),
670 value: crate::types::Value::Boolean(true),
671 }],
672 ),
673 ];
674
675 let result = analyzer.analyze(&rules);
676 assert_eq!(result.total_rules, 2);
677 assert!(!result.can_parallelize_safely);
679 }
680}