1#![allow(deprecated)]
2
3use crate::engine::rule::Rule;
4use std::collections::{HashMap, HashSet};
5
6#[derive(Debug, Clone)]
8pub struct DependencyAnalyzer {
9 readers: HashMap<String, Vec<String>>, writers: HashMap<String, Vec<String>>, dependencies: HashMap<String, HashSet<String>>,
15}
16
17impl Default for DependencyAnalyzer {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl DependencyAnalyzer {
24 pub fn new() -> Self {
26 Self {
27 readers: HashMap::new(),
28 writers: HashMap::new(),
29 dependencies: HashMap::new(),
30 }
31 }
32
33 pub fn analyze(&mut self, rules: &[Rule]) -> DependencyAnalysisResult {
35 self.clear();
36
37 for rule in rules {
39 self.analyze_rule_io(rule);
40 }
41
42 self.build_dependency_graph();
44
45 let conflicts = self.find_conflicts(rules);
47
48 let execution_groups = self.create_execution_groups(rules);
50 let conflicts_len = conflicts.len();
51
52 DependencyAnalysisResult {
53 total_rules: rules.len(),
54 conflicts: conflicts_len,
55 conflict_details: conflicts,
56 execution_groups,
57 can_parallelize_safely: conflicts_len == 0,
58 }
59 }
60
61 fn clear(&mut self) {
63 self.readers.clear();
64 self.writers.clear();
65 self.dependencies.clear();
66 }
67
68 fn analyze_rule_io(&mut self, rule: &Rule) {
70 let condition_reads = self.extract_condition_reads(rule);
72 for field in condition_reads {
73 self.readers
74 .entry(field)
75 .or_default()
76 .push(rule.name.clone());
77 }
78
79 let action_writes = self.extract_action_writes(rule);
81 for field in action_writes {
82 self.writers
83 .entry(field)
84 .or_default()
85 .push(rule.name.clone());
86 }
87 }
88
89 fn extract_condition_reads(&self, rule: &Rule) -> Vec<String> {
91 let mut reads = Vec::new();
92
93 Self::extract_fields_from_condition_group(&rule.conditions, &mut reads);
95
96 reads
97 }
98
99 fn extract_fields_from_condition_group(
101 condition_group: &crate::engine::rule::ConditionGroup,
102 reads: &mut Vec<String>,
103 ) {
104 match condition_group {
105 crate::engine::rule::ConditionGroup::Single(condition) => {
106 reads.push(condition.field.clone());
107 }
108 crate::engine::rule::ConditionGroup::Compound { left, right, .. } => {
109 Self::extract_fields_from_condition_group(left, reads);
110 Self::extract_fields_from_condition_group(right, reads);
111 }
112 crate::engine::rule::ConditionGroup::Not(inner) => {
113 Self::extract_fields_from_condition_group(inner, reads);
114 }
115 crate::engine::rule::ConditionGroup::Exists(inner) => {
116 Self::extract_fields_from_condition_group(inner, reads);
118 }
119 crate::engine::rule::ConditionGroup::Forall(inner) => {
120 Self::extract_fields_from_condition_group(inner, reads);
122 }
123 crate::engine::rule::ConditionGroup::Accumulate {
124 source_pattern,
125 extract_field,
126 ..
127 } => {
128 reads.push(format!("{}.{}", source_pattern, extract_field));
130 }
131
132 #[cfg(feature = "streaming")]
133 crate::engine::rule::ConditionGroup::StreamPattern {
134 stream_name,
135 event_type,
136 ..
137 } => {
138 if let Some(event_type) = event_type {
140 reads.push(format!("{}.{}", stream_name, event_type));
141 } else {
142 reads.push(stream_name.clone());
143 }
144 }
145 }
146 }
147
148 fn extract_action_writes(&self, rule: &Rule) -> Vec<String> {
150 let mut writes = Vec::new();
151
152 for action in &rule.actions {
154 match action {
155 crate::types::ActionType::Set { field, .. } => {
156 writes.push(field.clone());
157 }
158 crate::types::ActionType::Append { field, .. } => {
159 writes.push(field.clone());
160 }
161 crate::types::ActionType::Retract { object } => {
162 writes.push(format!("_retracted_{}", object));
164 }
165 crate::types::ActionType::MethodCall { object, method, .. } => {
166 writes.push(object.clone());
168
169 if method.contains("set")
171 || method.contains("update")
172 || method.contains("modify")
173 || method.contains("change")
174 {
175 writes.push(format!("{}.{}", object, method));
176 }
177 }
178 crate::types::ActionType::Custom {
179 action_type,
180 params,
181 } => {
182 if let Some(crate::types::Value::String(field)) = params.get("target_field") {
184 writes.push(field.clone());
185 }
186
187 writes.extend(self.analyze_custom_action_side_effects(action_type, params));
189 }
190 crate::types::ActionType::Log { .. } => {}
192 crate::types::ActionType::ActivateAgendaGroup { .. } => {}
194 crate::types::ActionType::ScheduleRule { .. } => {}
195 crate::types::ActionType::CompleteWorkflow { .. } => {}
196 crate::types::ActionType::SetWorkflowData { .. } => {}
197 }
198 }
199
200 writes
201 }
202
203 #[allow(dead_code)]
205 fn analyze_function_side_effects(&self, function_name: &str) -> Vec<String> {
206 let mut side_effects = Vec::new();
207
208 if function_name.starts_with("set") || function_name.starts_with("update") {
210 if let Some(field) = self.extract_field_from_function_name(function_name) {
212 side_effects.push(field);
213 }
214 } else if function_name.starts_with("calculate") || function_name.starts_with("compute") {
215 if let Some(field) = self.extract_field_from_function_name(function_name) {
217 side_effects.push(field);
218 }
219 } else if function_name.contains("modify") || function_name.contains("change") {
220 if let Some(field) = self.extract_field_from_function_name(function_name) {
222 side_effects.push(field);
223 }
224 }
225
226 side_effects
227 }
228
229 fn analyze_custom_action_side_effects(
231 &self,
232 action_type: &str,
233 params: &std::collections::HashMap<String, crate::types::Value>,
234 ) -> Vec<String> {
235 let mut side_effects = Vec::new();
236
237 for (key, value) in params {
239 if key == "field" || key == "target" || key == "output_field" {
240 if let crate::types::Value::String(field_name) = value {
241 side_effects.push(field_name.clone());
242 }
243 }
244 }
245
246 if action_type.contains("set")
248 || action_type.contains("update")
249 || action_type.contains("modify")
250 || action_type.contains("calculate")
251 {
252 if let Some(field) = self.extract_field_from_function_name(action_type) {
254 side_effects.push(field);
255 }
256 }
257
258 side_effects
259 }
260
261 fn extract_field_from_function_name(&self, name: &str) -> Option<String> {
263 let name = name
269 .trim_start_matches("set")
270 .trim_start_matches("update")
271 .trim_start_matches("calculate")
272 .trim_start_matches("compute")
273 .trim_start_matches("modify")
274 .trim_start_matches("change");
275
276 if name.contains("User") && name.contains("Score") {
278 Some("User.Score".to_string())
279 } else if name.contains("User") && name.contains("VIP") {
280 Some("User.IsVIP".to_string())
281 } else if name.contains("Order") && name.contains("Total") {
282 Some("Order.Total".to_string())
283 } else if name.contains("Order") && name.contains("Amount") {
284 Some("Order.Amount".to_string())
285 } else if name.contains("Discount") {
286 Some("Order.DiscountRate".to_string())
287 } else {
288 self.convert_camel_case_to_field(name)
290 }
291 }
292
293 fn convert_camel_case_to_field(&self, name: &str) -> Option<String> {
295 if name.is_empty() {
296 return None;
297 }
298
299 let mut result = String::new();
300 let chars = name.chars().peekable();
301
302 for c in chars {
303 if c.is_uppercase() && !result.is_empty() {
304 result.push('.');
305 }
306 result.push(c);
307 }
308
309 if result.contains('.') {
310 Some(result)
311 } else {
312 None
313 }
314 }
315
316 fn build_dependency_graph(&mut self) {
318 for (field, readers) in &self.readers {
319 if let Some(writers) = self.writers.get(field) {
320 for reader in readers {
323 for writer in writers {
324 if reader != writer {
325 self.dependencies
326 .entry(reader.clone())
327 .or_default()
328 .insert(writer.clone());
329 }
330 }
331 }
332 }
333 }
334 }
335
336 fn find_conflicts(&self, rules: &[Rule]) -> Vec<DependencyConflict> {
338 let mut conflicts = Vec::new();
339
340 let mut salience_groups: HashMap<i32, Vec<&Rule>> = HashMap::new();
342 for rule in rules {
343 salience_groups.entry(rule.salience).or_default().push(rule);
344 }
345
346 for (salience, group_rules) in salience_groups {
348 if group_rules.len() <= 1 {
349 continue; }
351
352 let mut field_writers: HashMap<String, Vec<String>> = HashMap::new();
354 for rule in &group_rules {
355 let writes = self.extract_action_writes(rule);
356 for field in writes {
357 field_writers
358 .entry(field)
359 .or_default()
360 .push(rule.name.clone());
361 }
362 }
363
364 for (field, writers) in field_writers {
365 if writers.len() > 1 {
366 conflicts.push(DependencyConflict {
367 conflict_type: ConflictType::WriteWrite,
368 field: field.clone(),
369 rules: writers,
370 salience,
371 description: format!("Multiple rules write to {}", field),
372 });
373 }
374 }
375
376 for rule in &group_rules {
378 let reads = self.extract_condition_reads(rule);
379 for field in &reads {
380 if let Some(writers) = self.writers.get(field) {
381 let conflicting_writers: Vec<String> = writers
382 .iter()
383 .filter(|writer| {
384 group_rules
385 .iter()
386 .any(|r| r.name == **writer && r.name != rule.name)
387 })
388 .cloned()
389 .collect();
390
391 if !conflicting_writers.is_empty() {
392 let mut involved_rules = conflicting_writers.clone();
393 involved_rules.push(rule.name.clone());
394
395 conflicts.push(DependencyConflict {
396 conflict_type: ConflictType::ReadWrite,
397 field: field.clone(),
398 rules: involved_rules,
399 salience,
400 description: format!(
401 "Rule {} reads {} while others write to it",
402 rule.name, field
403 ),
404 });
405 }
406 }
407 }
408 }
409 }
410
411 conflicts
412 }
413
414 fn create_execution_groups(&self, rules: &[Rule]) -> Vec<ExecutionGroup> {
416 let mut groups = Vec::new();
417
418 let mut salience_groups: HashMap<i32, Vec<Rule>> = HashMap::new();
420 for rule in rules {
421 salience_groups
422 .entry(rule.salience)
423 .or_default()
424 .push(rule.clone());
425 }
426
427 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
429 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
432 let rules_at_level = &salience_groups[&salience];
433
434 if rules_at_level.len() == 1 {
435 groups.push(ExecutionGroup {
437 rules: rules_at_level.clone(),
438 execution_mode: ExecutionMode::Sequential,
439 salience,
440 can_parallelize: false,
441 conflicts: Vec::new(),
442 });
443 } else {
444 let conflicts = self.find_conflicts(rules_at_level);
446 let can_parallelize = conflicts.is_empty();
447
448 groups.push(ExecutionGroup {
449 rules: rules_at_level.clone(),
450 execution_mode: if can_parallelize {
451 ExecutionMode::Parallel
452 } else {
453 ExecutionMode::Sequential
454 },
455 salience,
456 can_parallelize,
457 conflicts,
458 });
459 }
460 }
461
462 groups
463 }
464}
465
466#[derive(Debug, Clone)]
468pub struct DependencyAnalysisResult {
469 pub total_rules: usize,
471 pub conflicts: usize,
473 pub conflict_details: Vec<DependencyConflict>,
475 pub execution_groups: Vec<ExecutionGroup>,
477 pub can_parallelize_safely: bool,
479}
480
481#[derive(Debug, Clone)]
483pub struct DependencyConflict {
484 pub conflict_type: ConflictType,
486 pub field: String,
488 pub rules: Vec<String>,
490 pub salience: i32,
492 pub description: String,
494}
495
496#[derive(Debug, Clone, PartialEq)]
498pub enum ConflictType {
499 WriteWrite,
501 ReadWrite,
503 Circular,
505}
506
507#[derive(Debug, Clone)]
509pub struct ExecutionGroup {
510 pub rules: Vec<Rule>,
512 pub execution_mode: ExecutionMode,
514 pub salience: i32,
516 pub can_parallelize: bool,
518 pub conflicts: Vec<DependencyConflict>,
520}
521
522#[derive(Debug, Clone, PartialEq)]
524pub enum ExecutionMode {
525 Parallel,
527 Sequential,
529}
530
531#[derive(Debug, Clone, PartialEq)]
533pub enum ExecutionStrategy {
534 FullSequential,
536 FullParallel,
538 Hybrid,
540 ForcedSequential,
542}
543
544impl DependencyAnalysisResult {
545 pub fn get_summary(&self) -> String {
547 format!(
548 "š Dependency Analysis Summary:\n Total rules: {}\n Conflicts found: {}\n Safe for parallel: {}\n Execution groups: {}",
549 self.total_rules,
550 self.conflicts,
551 if self.can_parallelize_safely { "ā
Yes" } else { "ā No" },
552 self.execution_groups.len()
553 )
554 }
555
556 pub fn get_detailed_report(&self) -> String {
558 let mut report = self.get_summary();
559 report.push_str("\n\nš Detailed Analysis:");
560
561 for (i, group) in self.execution_groups.iter().enumerate() {
562 report.push_str(&format!(
563 "\n\nš Group {} (Salience {}):",
564 i + 1,
565 group.salience
566 ));
567 report.push_str(&format!(
568 "\n Mode: {:?} | Can parallelize: {}",
569 group.execution_mode,
570 if group.can_parallelize { "ā
" } else { "ā" }
571 ));
572 report.push_str(&format!(
573 "\n Rules: {}",
574 group
575 .rules
576 .iter()
577 .map(|r| r.name.as_str())
578 .collect::<Vec<_>>()
579 .join(", ")
580 ));
581
582 if !group.conflicts.is_empty() {
583 report.push_str("\n šØ Conflicts:");
584 for conflict in &group.conflicts {
585 report.push_str(&format!(
586 "\n - {}: {} (rules: {})",
587 match conflict.conflict_type {
588 ConflictType::WriteWrite => "Write-Write",
589 ConflictType::ReadWrite => "Read-Write",
590 ConflictType::Circular => "Circular",
591 },
592 conflict.field,
593 conflict.rules.join(", ")
594 ));
595 }
596 }
597 }
598
599 report
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use crate::engine::rule::{Condition, ConditionGroup};
607
608 #[test]
609 fn test_dependency_analyzer_creation() {
610 let analyzer = DependencyAnalyzer::new();
611 assert!(analyzer.readers.is_empty());
612 assert!(analyzer.writers.is_empty());
613 assert!(analyzer.dependencies.is_empty());
614 }
615
616 #[test]
617 fn test_safe_rules_analysis() {
618 let mut analyzer = DependencyAnalyzer::new();
619
620 let rules = vec![
621 Rule::new(
622 "AgeValidation".to_string(),
623 ConditionGroup::Single(Condition::new(
624 "User.Age".to_string(),
625 crate::types::Operator::GreaterThan,
626 crate::types::Value::Integer(18),
627 )),
628 vec![],
629 ),
630 Rule::new(
631 "CountryCheck".to_string(),
632 ConditionGroup::Single(Condition::new(
633 "User.Country".to_string(),
634 crate::types::Operator::Equal,
635 crate::types::Value::String("US".to_string()),
636 )),
637 vec![],
638 ),
639 ];
640
641 let result = analyzer.analyze(&rules);
642 assert_eq!(result.total_rules, 2);
643 assert_eq!(result.conflicts, 0);
644 assert!(result.can_parallelize_safely);
645 }
646
647 #[test]
648 fn test_conflicting_rules_analysis() {
649 let mut analyzer = DependencyAnalyzer::new();
650
651 let rules = vec![
652 Rule::new(
653 "CalculateScore".to_string(),
654 ConditionGroup::Single(Condition::new(
655 "User.Data".to_string(),
656 crate::types::Operator::Equal,
657 crate::types::Value::String("valid".to_string()),
658 )),
659 vec![crate::types::ActionType::Set {
660 field: "User.Score".to_string(),
661 value: crate::types::Value::Integer(85),
662 }],
663 ),
664 Rule::new(
665 "CheckVIPStatus".to_string(),
666 ConditionGroup::Single(Condition::new(
667 "User.Score".to_string(),
668 crate::types::Operator::GreaterThan,
669 crate::types::Value::Integer(80),
670 )),
671 vec![crate::types::ActionType::Set {
672 field: "User.IsVIP".to_string(),
673 value: crate::types::Value::Boolean(true),
674 }],
675 ),
676 ];
677
678 let result = analyzer.analyze(&rules);
679 assert_eq!(result.total_rules, 2);
680 assert!(!result.can_parallelize_safely);
682 }
683}