1#![allow(clippy::collapsible_match)]
2
3use crate::engine::{facts::Facts, knowledge_base::KnowledgeBase, rule::Rule};
4use crate::errors::{Result, RuleEngineError};
5use crate::types::{ActionType, Value};
6use std::collections::HashMap;
7use std::sync::{Arc, Mutex, RwLock};
8use std::thread;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone)]
13pub struct ParallelConfig {
14 pub enabled: bool,
16 pub max_threads: usize,
18 pub min_rules_per_thread: usize,
20 pub dependency_analysis: bool,
22}
23
24impl Default for ParallelConfig {
25 fn default() -> Self {
26 Self {
27 enabled: true,
28 max_threads: std::thread::available_parallelism()
29 .map(|n| n.get())
30 .unwrap_or(4),
31 min_rules_per_thread: 2,
32 dependency_analysis: true,
33 }
34 }
35}
36
37type CustomFunctionMap =
39 HashMap<String, Box<dyn Fn(&[Value], &Facts) -> Result<Value> + Send + Sync>>;
40
41#[derive(Debug, Clone)]
43pub struct RuleExecutionContext {
44 pub rule: Rule,
46 pub fired: bool,
48 pub error: Option<String>,
50 pub execution_time: Duration,
52}
53
54pub struct ParallelRuleEngine {
56 config: ParallelConfig,
57 custom_functions: Arc<RwLock<CustomFunctionMap>>,
58}
59
60impl ParallelRuleEngine {
61 pub fn new(config: ParallelConfig) -> Self {
63 Self {
64 config,
65 custom_functions: Arc::new(RwLock::new(HashMap::new())),
66 }
67 }
68
69 pub fn register_function<F>(&mut self, name: &str, func: F)
71 where
72 F: Fn(&[Value], &Facts) -> Result<Value> + Send + Sync + 'static,
73 {
74 let mut functions = self.custom_functions.write().unwrap();
75 functions.insert(name.to_string(), Box::new(func));
76 }
77
78 pub fn execute_parallel(
80 &self,
81 knowledge_base: &KnowledgeBase,
82 facts: &Facts,
83 debug_mode: bool,
84 ) -> Result<ParallelExecutionResult> {
85 let start_time = Instant::now();
86
87 if debug_mode {
88 println!(
89 "๐ Starting parallel rule execution with {} rules",
90 knowledge_base.get_rules().len()
91 );
92 }
93
94 let salience_groups = self.group_rules_by_salience(&knowledge_base.get_rules());
96
97 let mut total_fired = 0;
98 let mut total_evaluated = 0;
99 let mut execution_contexts = Vec::new();
100
101 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
103 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
106 let rules_at_level = &salience_groups[&salience];
107
108 if debug_mode {
109 println!(
110 "โก Processing {} rules at salience level {}",
111 rules_at_level.len(),
112 salience
113 );
114 }
115
116 let should_parallelize = self.should_parallelize(rules_at_level);
118
119 let contexts = if should_parallelize {
120 self.execute_rules_parallel(rules_at_level, facts, debug_mode)?
121 } else {
122 self.execute_rules_sequential(rules_at_level, facts, debug_mode)?
123 };
124
125 for context in &contexts {
127 total_evaluated += 1;
128 if context.fired {
129 total_fired += 1;
130 }
131 }
132
133 execution_contexts.extend(contexts);
134 }
135
136 Ok(ParallelExecutionResult {
137 total_rules_evaluated: total_evaluated,
138 total_rules_fired: total_fired,
139 execution_time: start_time.elapsed(),
140 parallel_speedup: self.calculate_speedup(&execution_contexts),
141 execution_contexts,
142 })
143 }
144
145 fn group_rules_by_salience(&self, rules: &[Rule]) -> HashMap<i32, Vec<Rule>> {
147 let mut groups: HashMap<i32, Vec<Rule>> = HashMap::new();
148 for rule in rules {
149 if rule.enabled {
150 groups.entry(rule.salience).or_default().push(rule.clone());
151 }
152 }
153 groups
154 }
155
156 fn should_parallelize(&self, rules: &[Rule]) -> bool {
158 self.config.enabled && rules.len() >= self.config.min_rules_per_thread && rules.len() >= 2
159 }
160
161 fn execute_rules_parallel(
163 &self,
164 rules: &[Rule],
165 facts: &Facts,
166 debug_mode: bool,
167 ) -> Result<Vec<RuleExecutionContext>> {
168 let results = Arc::new(Mutex::new(Vec::new()));
169 let facts_arc = Arc::new(facts.clone());
170 let functions_arc = Arc::clone(&self.custom_functions);
171
172 let chunk_size = rules.len().div_ceil(self.config.max_threads);
174 let chunks: Vec<_> = rules.chunks(chunk_size).collect();
175
176 let handles: Vec<_> = chunks
177 .into_iter()
178 .enumerate()
179 .map(|(thread_id, chunk)| {
180 let chunk = chunk.to_vec();
181 let results_clone = Arc::clone(&results);
182 let facts_clone = Arc::clone(&facts_arc);
183 let functions_clone = Arc::clone(&functions_arc);
184
185 thread::spawn(move || {
186 if debug_mode {
187 println!(" ๐งต Thread {} processing {} rules", thread_id, chunk.len());
188 }
189
190 let mut thread_results = Vec::new();
191 for rule in chunk {
192 let start = Instant::now();
193 let fired =
195 Self::evaluate_rule_conditions(&rule, &facts_clone, &functions_clone);
196
197 if fired {
198 if debug_mode {
199 println!(" ๐ฅ Rule '{}' fired", rule.name);
200 }
201
202 for action in &rule.actions {
204 if let Err(e) = Self::execute_action_parallel(
205 action,
206 &facts_clone,
207 &functions_clone,
208 ) {
209 if debug_mode {
210 println!(" โ Action failed: {}", e);
211 }
212 }
213 }
214 }
215
216 thread_results.push(RuleExecutionContext {
217 rule: rule.clone(),
218 fired,
219 error: None,
220 execution_time: start.elapsed(),
221 });
222 }
223
224 let mut results = results_clone.lock().unwrap();
225 results.extend(thread_results);
226 })
227 })
228 .collect();
229
230 for handle in handles {
232 handle
233 .join()
234 .map_err(|_| RuleEngineError::EvaluationError {
235 message: "Thread panicked during parallel execution".to_string(),
236 })?;
237 }
238
239 let results = results.lock().unwrap();
240 Ok(results.clone())
241 }
242
243 fn execute_rules_sequential(
245 &self,
246 rules: &[Rule],
247 facts: &Facts,
248 debug_mode: bool,
249 ) -> Result<Vec<RuleExecutionContext>> {
250 let mut contexts = Vec::new();
251 let functions_arc = Arc::clone(&self.custom_functions);
252
253 for rule in rules {
254 let start = Instant::now();
255 let fired = Self::evaluate_rule_conditions(rule, facts, &functions_arc);
256
257 if fired && debug_mode {
258 println!(" ๐ฅ Rule '{}' fired", rule.name);
259 }
260
261 if fired {
262 for action in &rule.actions {
264 if let Err(e) = Self::execute_action_parallel(action, facts, &functions_arc) {
265 if debug_mode {
266 println!(" โ Action failed: {}", e);
267 }
268 }
269 }
270 }
271
272 contexts.push(RuleExecutionContext {
273 rule: rule.clone(),
274 fired,
275 error: None,
276 execution_time: start.elapsed(),
277 });
278 }
279
280 Ok(contexts)
281 }
282
283 fn evaluate_rule_conditions(
297 rule: &Rule,
298 facts: &Facts,
299 functions: &Arc<RwLock<CustomFunctionMap>>,
300 ) -> bool {
301 use crate::engine::pattern_matcher::PatternMatcher;
302 use crate::engine::rule::ConditionGroup;
303
304 match &rule.conditions {
305 ConditionGroup::Single(condition) => {
306 Self::evaluate_single_condition(condition, facts, functions)
307 }
308 ConditionGroup::Compound {
309 left,
310 operator,
311 right,
312 } => {
313 let left_rule = Rule {
315 name: rule.name.clone(),
316 description: rule.description.clone(),
317 conditions: (**left).clone(),
318 actions: rule.actions.clone(),
319 salience: rule.salience,
320 enabled: rule.enabled,
321 no_loop: rule.no_loop,
322 lock_on_active: rule.lock_on_active,
323 agenda_group: rule.agenda_group.clone(),
324 activation_group: rule.activation_group.clone(),
325 date_effective: rule.date_effective,
326 date_expires: rule.date_expires,
327 };
328 let right_rule = Rule {
329 name: rule.name.clone(),
330 description: rule.description.clone(),
331 conditions: (**right).clone(),
332 actions: rule.actions.clone(),
333 salience: rule.salience,
334 enabled: rule.enabled,
335 no_loop: rule.no_loop,
336 lock_on_active: rule.lock_on_active,
337 agenda_group: rule.agenda_group.clone(),
338 activation_group: rule.activation_group.clone(),
339 date_effective: rule.date_effective,
340 date_expires: rule.date_expires,
341 };
342
343 let left_result = Self::evaluate_rule_conditions(&left_rule, facts, functions);
344 let right_result = Self::evaluate_rule_conditions(&right_rule, facts, functions);
345
346 match operator {
347 crate::types::LogicalOperator::And => left_result && right_result,
348 crate::types::LogicalOperator::Or => left_result || right_result,
349 crate::types::LogicalOperator::Not => false, }
351 }
352 ConditionGroup::Not(condition) => {
353 let temp_rule = Rule {
354 name: rule.name.clone(),
355 description: rule.description.clone(),
356 conditions: (**condition).clone(),
357 actions: rule.actions.clone(),
358 salience: rule.salience,
359 enabled: rule.enabled,
360 no_loop: rule.no_loop,
361 lock_on_active: rule.lock_on_active,
362 agenda_group: rule.agenda_group.clone(),
363 activation_group: rule.activation_group.clone(),
364 date_effective: rule.date_effective,
365 date_expires: rule.date_expires,
366 };
367 !Self::evaluate_rule_conditions(&temp_rule, facts, functions)
368 }
369 ConditionGroup::Exists(condition) => PatternMatcher::evaluate_exists(condition, facts),
371 ConditionGroup::Forall(condition) => PatternMatcher::evaluate_forall(condition, facts),
372 ConditionGroup::Accumulate {
374 result_var,
375 source_pattern,
376 extract_field,
377 source_conditions,
378 function,
379 function_arg,
380 } => {
381 Self::evaluate_accumulate_parallel(
383 result_var,
384 source_pattern,
385 extract_field,
386 source_conditions,
387 function,
388 function_arg,
389 facts,
390 )
391 .is_ok()
392 }
393
394 #[cfg(feature = "streaming")]
395 ConditionGroup::StreamPattern { .. } => {
396 true
399 }
400 }
401 }
402
403 fn evaluate_single_condition(
405 condition: &crate::engine::rule::Condition,
406 facts: &Facts,
407 functions: &Arc<RwLock<CustomFunctionMap>>,
408 ) -> bool {
409 use crate::engine::rule::ConditionExpression;
410
411 match &condition.expression {
412 ConditionExpression::Field(field_name) => {
413 if let Some(value) = facts
415 .get_nested(field_name)
416 .or_else(|| facts.get(field_name))
417 {
418 let rhs = match &condition.value {
420 Value::String(s) => {
421 facts
423 .get_nested(s)
424 .or_else(|| facts.get(s))
425 .unwrap_or(condition.value.clone())
426 }
427 Value::Expression(expr) => {
428 match crate::expression::evaluate_expression(expr, facts) {
430 Ok(evaluated) => evaluated,
431 Err(_) => facts
432 .get_nested(expr)
433 .or_else(|| facts.get(expr))
434 .unwrap_or(condition.value.clone()),
435 }
436 }
437 _ => condition.value.clone(),
438 };
439 condition.operator.evaluate(&value, &rhs)
440 } else {
441 false
442 }
443 }
444 ConditionExpression::FunctionCall { name, args } => {
445 let functions_guard = functions.read().unwrap();
447 if let Some(function) = functions_guard.get(name) {
448 let arg_values: Vec<Value> = args
450 .iter()
451 .map(|arg| {
452 facts
453 .get_nested(arg)
454 .or_else(|| facts.get(arg))
455 .unwrap_or(Value::String(arg.clone()))
456 })
457 .collect();
458
459 match function(&arg_values, facts) {
461 Ok(result_value) => {
462 condition.operator.evaluate(&result_value, &condition.value)
463 }
464 Err(_) => false,
465 }
466 } else {
467 false
468 }
469 }
470 ConditionExpression::Test { name, args } => {
471 let functions_guard = functions.read().unwrap();
473 if let Some(function) = functions_guard.get(name) {
474 let arg_values: Vec<Value> = args
475 .iter()
476 .map(|arg| {
477 facts
478 .get_nested(arg)
479 .or_else(|| facts.get(arg))
480 .unwrap_or(Value::String(arg.clone()))
481 })
482 .collect();
483
484 match function(&arg_values, facts) {
485 Ok(result_value) => {
486 match result_value {
488 Value::Boolean(b) => b,
489 Value::Integer(i) => i != 0,
490 Value::Number(f) => f != 0.0,
491 Value::String(s) => !s.is_empty(),
492 _ => false,
493 }
494 }
495 Err(_) => false,
496 }
497 } else {
498 false
499 }
500 }
501 ConditionExpression::MultiField {
502 field,
503 operation,
504 variable: _,
505 } => {
506 Self::evaluate_multifield(field, operation, condition, facts)
508 }
509 }
510 }
511
512 fn evaluate_multifield(
514 field: &str,
515 operation: &str,
516 condition: &crate::engine::rule::Condition,
517 facts: &Facts,
518 ) -> bool {
519 if let Some(value) = facts.get_nested(field).or_else(|| facts.get(field)) {
520 match value {
521 Value::Array(items) => {
522 match operation {
523 "empty" => items.is_empty(),
524 "not_empty" => !items.is_empty(),
525 "count" => {
526 let count = Value::Integer(items.len() as i64);
527 condition.operator.evaluate(&count, &condition.value)
528 }
529 "first" => {
530 if let Some(first) = items.first() {
531 condition.operator.evaluate(first, &condition.value)
532 } else {
533 false
534 }
535 }
536 "last" => {
537 if let Some(last) = items.last() {
538 condition.operator.evaluate(last, &condition.value)
539 } else {
540 false
541 }
542 }
543 "contains" => items
544 .iter()
545 .any(|item| condition.operator.evaluate(item, &condition.value)),
546 "collect" => {
547 true
549 }
550 _ => false,
551 }
552 }
553 _ => false,
554 }
555 } else {
556 false
557 }
558 }
559
560 fn evaluate_accumulate_parallel(
562 result_var: &str,
563 source_pattern: &str,
564 extract_field: &str,
565 source_conditions: &[String],
566 function: &str,
567 _function_arg: &str,
568 facts: &Facts,
569 ) -> Result<()> {
570 let all_facts = facts.get_all_facts();
572 let mut matching_values = Vec::new();
573
574 let pattern_prefix = format!("{}.", source_pattern);
575
576 let mut instances: HashMap<String, HashMap<String, Value>> = HashMap::new();
578
579 for (key, value) in &all_facts {
580 if key.starts_with(&pattern_prefix) {
581 let parts: Vec<&str> = key
582 .strip_prefix(&pattern_prefix)
583 .unwrap()
584 .split('.')
585 .collect();
586
587 if parts.len() >= 2 {
588 let instance_id = parts[0];
589 let field_name = parts[1..].join(".");
590
591 instances
592 .entry(instance_id.to_string())
593 .or_default()
594 .insert(field_name, value.clone());
595 } else if parts.len() == 1 {
596 instances
597 .entry("default".to_string())
598 .or_default()
599 .insert(parts[0].to_string(), value.clone());
600 }
601 }
602 }
603
604 for (_instance_id, fields) in instances {
606 let matches_conditions = source_conditions.is_empty() || {
607 source_conditions.iter().all(|_cond| {
608 true })
611 };
612
613 if matches_conditions {
614 if let Some(value) = fields.get(extract_field) {
615 matching_values.push(value.clone());
616 }
617 }
618 }
619
620 let result: Value = match function {
622 "sum" => {
623 let sum: f64 = matching_values
624 .iter()
625 .filter_map(|v| match v {
626 Value::Integer(i) => Some(*i as f64),
627 Value::Number(n) => Some(*n),
628 _ => None,
629 })
630 .sum();
631 Value::Number(sum)
632 }
633 "average" | "avg" => {
634 let values: Vec<f64> = matching_values
635 .iter()
636 .filter_map(|v| match v {
637 Value::Integer(i) => Some(*i as f64),
638 Value::Number(n) => Some(*n),
639 _ => None,
640 })
641 .collect();
642 if values.is_empty() {
643 Value::Number(0.0)
644 } else {
645 Value::Number(values.iter().sum::<f64>() / values.len() as f64)
646 }
647 }
648 "min" => {
649 let min = matching_values
650 .iter()
651 .filter_map(|v| match v {
652 Value::Integer(i) => Some(*i as f64),
653 Value::Number(n) => Some(*n),
654 _ => None,
655 })
656 .fold(f64::INFINITY, f64::min);
657 Value::Number(min)
658 }
659 "max" => {
660 let max = matching_values
661 .iter()
662 .filter_map(|v| match v {
663 Value::Integer(i) => Some(*i as f64),
664 Value::Number(n) => Some(*n),
665 _ => None,
666 })
667 .fold(f64::NEG_INFINITY, f64::max);
668 Value::Number(max)
669 }
670 "count" => Value::Integer(matching_values.len() as i64),
671 "collect" => Value::Array(matching_values.clone()),
672 _ => Value::Integer(0),
673 };
674
675 facts.set(result_var, result);
677 Ok(())
678 }
679
680 fn execute_action_parallel(
682 action: &ActionType,
683 facts: &Facts,
684 functions: &Arc<RwLock<CustomFunctionMap>>,
685 ) -> Result<()> {
686 match action {
687 ActionType::Custom { action_type, .. } => {
688 let functions_guard = functions.read().unwrap();
690 if let Some(func) = functions_guard.get(action_type) {
691 let empty_args = Vec::new();
692 let _result = func(&empty_args, facts)?;
693 }
694 Ok(())
695 }
696 ActionType::MethodCall { .. } => {
697 Ok(())
699 }
700 ActionType::Set { .. } => {
701 Ok(())
703 }
704 ActionType::Log { message } => {
705 println!(" ๐ {}", message);
706 Ok(())
707 }
708 ActionType::Retract { .. } => {
709 Ok(())
711 }
712 ActionType::ActivateAgendaGroup { .. } => {
713 Ok(())
715 }
716 ActionType::ScheduleRule { .. } => {
717 Ok(())
719 }
720 ActionType::CompleteWorkflow { .. } => {
721 Ok(())
723 }
724 ActionType::SetWorkflowData { .. } => {
725 Ok(())
727 }
728 ActionType::Append { .. } => {
729 Ok(())
731 }
732 }
733 }
734
735 fn calculate_speedup(&self, contexts: &[RuleExecutionContext]) -> f64 {
737 if contexts.is_empty() {
738 return 1.0;
739 }
740
741 let total_time: Duration = contexts.iter().map(|c| c.execution_time).sum();
742 let max_time = contexts
743 .iter()
744 .map(|c| c.execution_time)
745 .max()
746 .unwrap_or(Duration::ZERO);
747
748 if max_time.as_nanos() > 0 {
749 total_time.as_nanos() as f64 / max_time.as_nanos() as f64
750 } else {
751 1.0
752 }
753 }
754}
755
756#[derive(Debug)]
758pub struct ParallelExecutionResult {
759 pub total_rules_evaluated: usize,
761 pub total_rules_fired: usize,
763 pub execution_time: Duration,
765 pub execution_contexts: Vec<RuleExecutionContext>,
767 pub parallel_speedup: f64,
769}
770
771impl ParallelExecutionResult {
772 pub fn get_stats(&self) -> String {
774 format!(
775 "๐ Parallel Execution Stats:\n Rules evaluated: {}\n Rules fired: {}\n Execution time: {:?}\n Parallel speedup: {:.2}x",
776 self.total_rules_evaluated,
777 self.total_rules_fired,
778 self.execution_time,
779 self.parallel_speedup
780 )
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787 use crate::engine::rule::{Condition, ConditionGroup};
788 use crate::types::{Operator, Value};
789
790 #[test]
791 fn test_parallel_config_default() {
792 let config = ParallelConfig::default();
793 assert!(config.enabled);
794 assert!(config.max_threads > 0);
795 assert_eq!(config.min_rules_per_thread, 2);
796 }
797
798 #[test]
799 fn test_parallel_engine_creation() {
800 let config = ParallelConfig::default();
801 let engine = ParallelRuleEngine::new(config);
802 assert!(engine.custom_functions.read().unwrap().is_empty());
803 }
804
805 #[test]
806 fn test_salience_grouping() {
807 let config = ParallelConfig::default();
808 let engine = ParallelRuleEngine::new(config);
809
810 let rules = vec![
811 Rule::new(
812 "Rule1".to_string(),
813 ConditionGroup::Single(Condition::new(
814 "test".to_string(),
815 Operator::Equal,
816 Value::Boolean(true),
817 )),
818 vec![],
819 )
820 .with_priority(10),
821 Rule::new(
822 "Rule2".to_string(),
823 ConditionGroup::Single(Condition::new(
824 "test".to_string(),
825 Operator::Equal,
826 Value::Boolean(true),
827 )),
828 vec![],
829 )
830 .with_priority(10),
831 Rule::new(
832 "Rule3".to_string(),
833 ConditionGroup::Single(Condition::new(
834 "test".to_string(),
835 Operator::Equal,
836 Value::Boolean(true),
837 )),
838 vec![],
839 )
840 .with_priority(5),
841 ];
842
843 let groups = engine.group_rules_by_salience(&rules);
844 assert_eq!(groups.len(), 2);
845 assert_eq!(groups[&10].len(), 2);
846 assert_eq!(groups[&5].len(), 1);
847 }
848}