1#![allow(clippy::collapsible_match)]
2
3use crate::engine::{facts::Facts, knowledge_base::KnowledgeBase, rule::Rule};
4use crate::errors::{Result, RuleEngineError};
5use crate::types::{ActionType, Value};
6use std::collections::HashMap;
7use std::sync::{Arc, Mutex, RwLock};
8use std::thread;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone)]
13pub struct ParallelConfig {
14 pub enabled: bool,
16 pub max_threads: usize,
18 pub min_rules_per_thread: usize,
20 pub dependency_analysis: bool,
22}
23
24impl Default for ParallelConfig {
25 fn default() -> Self {
26 Self {
27 enabled: true,
28 max_threads: num_cpus::get(),
29 min_rules_per_thread: 2,
30 dependency_analysis: true,
31 }
32 }
33}
34
35type CustomFunctionMap =
37 HashMap<String, Box<dyn Fn(&[Value], &Facts) -> Result<Value> + Send + Sync>>;
38
39#[derive(Debug, Clone)]
41pub struct RuleExecutionContext {
42 pub rule: Rule,
44 pub fired: bool,
46 pub error: Option<String>,
48 pub execution_time: Duration,
50}
51
52pub struct ParallelRuleEngine {
54 config: ParallelConfig,
55 custom_functions: Arc<RwLock<CustomFunctionMap>>,
56}
57
58impl ParallelRuleEngine {
59 pub fn new(config: ParallelConfig) -> Self {
61 Self {
62 config,
63 custom_functions: Arc::new(RwLock::new(HashMap::new())),
64 }
65 }
66
67 pub fn register_function<F>(&mut self, name: &str, func: F)
69 where
70 F: Fn(&[Value], &Facts) -> Result<Value> + Send + Sync + 'static,
71 {
72 let mut functions = self.custom_functions.write().unwrap();
73 functions.insert(name.to_string(), Box::new(func));
74 }
75
76 pub fn execute_parallel(
78 &self,
79 knowledge_base: &KnowledgeBase,
80 facts: &Facts,
81 debug_mode: bool,
82 ) -> Result<ParallelExecutionResult> {
83 let start_time = Instant::now();
84
85 if debug_mode {
86 println!(
87 "๐ Starting parallel rule execution with {} rules",
88 knowledge_base.get_rules().len()
89 );
90 }
91
92 let salience_groups = self.group_rules_by_salience(&knowledge_base.get_rules());
94
95 let mut total_fired = 0;
96 let mut total_evaluated = 0;
97 let mut execution_contexts = Vec::new();
98
99 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
101 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
104 let rules_at_level = &salience_groups[&salience];
105
106 if debug_mode {
107 println!(
108 "โก Processing {} rules at salience level {}",
109 rules_at_level.len(),
110 salience
111 );
112 }
113
114 let should_parallelize = self.should_parallelize(rules_at_level);
116
117 let contexts = if should_parallelize {
118 self.execute_rules_parallel(rules_at_level, facts, debug_mode)?
119 } else {
120 self.execute_rules_sequential(rules_at_level, facts, debug_mode)?
121 };
122
123 for context in &contexts {
125 total_evaluated += 1;
126 if context.fired {
127 total_fired += 1;
128 }
129 }
130
131 execution_contexts.extend(contexts);
132 }
133
134 Ok(ParallelExecutionResult {
135 total_rules_evaluated: total_evaluated,
136 total_rules_fired: total_fired,
137 execution_time: start_time.elapsed(),
138 parallel_speedup: self.calculate_speedup(&execution_contexts),
139 execution_contexts,
140 })
141 }
142
143 fn group_rules_by_salience(&self, rules: &[Rule]) -> HashMap<i32, Vec<Rule>> {
145 let mut groups: HashMap<i32, Vec<Rule>> = HashMap::new();
146 for rule in rules {
147 if rule.enabled {
148 groups.entry(rule.salience).or_default().push(rule.clone());
149 }
150 }
151 groups
152 }
153
154 fn should_parallelize(&self, rules: &[Rule]) -> bool {
156 self.config.enabled && rules.len() >= self.config.min_rules_per_thread && rules.len() >= 2
157 }
158
159 fn execute_rules_parallel(
161 &self,
162 rules: &[Rule],
163 facts: &Facts,
164 debug_mode: bool,
165 ) -> Result<Vec<RuleExecutionContext>> {
166 let results = Arc::new(Mutex::new(Vec::new()));
167 let facts_arc = Arc::new(facts.clone());
168 let functions_arc = Arc::clone(&self.custom_functions);
169
170 let chunk_size = rules.len().div_ceil(self.config.max_threads);
172 let chunks: Vec<_> = rules.chunks(chunk_size).collect();
173
174 let handles: Vec<_> = chunks
175 .into_iter()
176 .enumerate()
177 .map(|(thread_id, chunk)| {
178 let chunk = chunk.to_vec();
179 let results_clone = Arc::clone(&results);
180 let facts_clone = Arc::clone(&facts_arc);
181 let functions_clone = Arc::clone(&functions_arc);
182
183 thread::spawn(move || {
184 if debug_mode {
185 println!(" ๐งต Thread {} processing {} rules", thread_id, chunk.len());
186 }
187
188 let mut thread_results = Vec::new();
189 for rule in chunk {
190 let start = Instant::now();
191 let fired =
193 Self::evaluate_rule_conditions(&rule, &facts_clone, &functions_clone);
194
195 if fired {
196 if debug_mode {
197 println!(" ๐ฅ Rule '{}' fired", rule.name);
198 }
199
200 for action in &rule.actions {
202 if let Err(e) = Self::execute_action_parallel(
203 action,
204 &facts_clone,
205 &functions_clone,
206 ) {
207 if debug_mode {
208 println!(" โ Action failed: {}", e);
209 }
210 }
211 }
212 }
213
214 thread_results.push(RuleExecutionContext {
215 rule: rule.clone(),
216 fired,
217 error: None,
218 execution_time: start.elapsed(),
219 });
220 }
221
222 let mut results = results_clone.lock().unwrap();
223 results.extend(thread_results);
224 })
225 })
226 .collect();
227
228 for handle in handles {
230 handle
231 .join()
232 .map_err(|_| RuleEngineError::EvaluationError {
233 message: "Thread panicked during parallel execution".to_string(),
234 })?;
235 }
236
237 let results = results.lock().unwrap();
238 Ok(results.clone())
239 }
240
241 fn execute_rules_sequential(
243 &self,
244 rules: &[Rule],
245 facts: &Facts,
246 debug_mode: bool,
247 ) -> Result<Vec<RuleExecutionContext>> {
248 let mut contexts = Vec::new();
249 let functions_arc = Arc::clone(&self.custom_functions);
250
251 for rule in rules {
252 let start = Instant::now();
253 let fired = Self::evaluate_rule_conditions(rule, facts, &functions_arc);
254
255 if fired && debug_mode {
256 println!(" ๐ฅ Rule '{}' fired", rule.name);
257 }
258
259 if fired {
260 for action in &rule.actions {
262 if let Err(e) = Self::execute_action_parallel(action, facts, &functions_arc) {
263 if debug_mode {
264 println!(" โ Action failed: {}", e);
265 }
266 }
267 }
268 }
269
270 contexts.push(RuleExecutionContext {
271 rule: rule.clone(),
272 fired,
273 error: None,
274 execution_time: start.elapsed(),
275 });
276 }
277
278 Ok(contexts)
279 }
280
281 fn evaluate_rule_conditions(
295 rule: &Rule,
296 facts: &Facts,
297 functions: &Arc<RwLock<CustomFunctionMap>>,
298 ) -> bool {
299 use crate::engine::pattern_matcher::PatternMatcher;
300 use crate::engine::rule::ConditionGroup;
301
302 match &rule.conditions {
303 ConditionGroup::Single(condition) => {
304 Self::evaluate_single_condition(condition, facts, functions)
305 }
306 ConditionGroup::Compound {
307 left,
308 operator,
309 right,
310 } => {
311 let left_rule = Rule {
313 name: rule.name.clone(),
314 description: rule.description.clone(),
315 conditions: (**left).clone(),
316 actions: rule.actions.clone(),
317 salience: rule.salience,
318 enabled: rule.enabled,
319 no_loop: rule.no_loop,
320 lock_on_active: rule.lock_on_active,
321 agenda_group: rule.agenda_group.clone(),
322 activation_group: rule.activation_group.clone(),
323 date_effective: rule.date_effective,
324 date_expires: rule.date_expires,
325 };
326 let right_rule = Rule {
327 name: rule.name.clone(),
328 description: rule.description.clone(),
329 conditions: (**right).clone(),
330 actions: rule.actions.clone(),
331 salience: rule.salience,
332 enabled: rule.enabled,
333 no_loop: rule.no_loop,
334 lock_on_active: rule.lock_on_active,
335 agenda_group: rule.agenda_group.clone(),
336 activation_group: rule.activation_group.clone(),
337 date_effective: rule.date_effective,
338 date_expires: rule.date_expires,
339 };
340
341 let left_result = Self::evaluate_rule_conditions(&left_rule, facts, functions);
342 let right_result = Self::evaluate_rule_conditions(&right_rule, facts, functions);
343
344 match operator {
345 crate::types::LogicalOperator::And => left_result && right_result,
346 crate::types::LogicalOperator::Or => left_result || right_result,
347 crate::types::LogicalOperator::Not => false, }
349 }
350 ConditionGroup::Not(condition) => {
351 let temp_rule = Rule {
352 name: rule.name.clone(),
353 description: rule.description.clone(),
354 conditions: (**condition).clone(),
355 actions: rule.actions.clone(),
356 salience: rule.salience,
357 enabled: rule.enabled,
358 no_loop: rule.no_loop,
359 lock_on_active: rule.lock_on_active,
360 agenda_group: rule.agenda_group.clone(),
361 activation_group: rule.activation_group.clone(),
362 date_effective: rule.date_effective,
363 date_expires: rule.date_expires,
364 };
365 !Self::evaluate_rule_conditions(&temp_rule, facts, functions)
366 }
367 ConditionGroup::Exists(condition) => PatternMatcher::evaluate_exists(condition, facts),
369 ConditionGroup::Forall(condition) => PatternMatcher::evaluate_forall(condition, facts),
370 ConditionGroup::Accumulate {
372 result_var,
373 source_pattern,
374 extract_field,
375 source_conditions,
376 function,
377 function_arg,
378 } => {
379 Self::evaluate_accumulate_parallel(
381 result_var,
382 source_pattern,
383 extract_field,
384 source_conditions,
385 function,
386 function_arg,
387 facts,
388 )
389 .is_ok()
390 }
391 }
392 }
393
394 fn evaluate_single_condition(
396 condition: &crate::engine::rule::Condition,
397 facts: &Facts,
398 functions: &Arc<RwLock<CustomFunctionMap>>,
399 ) -> bool {
400 use crate::engine::rule::ConditionExpression;
401
402 match &condition.expression {
403 ConditionExpression::Field(field_name) => {
404 if let Some(value) = facts
406 .get_nested(field_name)
407 .or_else(|| facts.get(field_name))
408 {
409 let rhs = match &condition.value {
411 Value::String(s) => {
412 facts
414 .get_nested(s)
415 .or_else(|| facts.get(s))
416 .unwrap_or(condition.value.clone())
417 }
418 Value::Expression(expr) => {
419 match crate::expression::evaluate_expression(expr, facts) {
421 Ok(evaluated) => evaluated,
422 Err(_) => facts
423 .get_nested(expr)
424 .or_else(|| facts.get(expr))
425 .unwrap_or(condition.value.clone()),
426 }
427 }
428 _ => condition.value.clone(),
429 };
430 condition.operator.evaluate(&value, &rhs)
431 } else {
432 false
433 }
434 }
435 ConditionExpression::FunctionCall { name, args } => {
436 let functions_guard = functions.read().unwrap();
438 if let Some(function) = functions_guard.get(name) {
439 let arg_values: Vec<Value> = args
441 .iter()
442 .map(|arg| {
443 facts
444 .get_nested(arg)
445 .or_else(|| facts.get(arg))
446 .unwrap_or(Value::String(arg.clone()))
447 })
448 .collect();
449
450 match function(&arg_values, facts) {
452 Ok(result_value) => {
453 condition.operator.evaluate(&result_value, &condition.value)
454 }
455 Err(_) => false,
456 }
457 } else {
458 false
459 }
460 }
461 ConditionExpression::Test { name, args } => {
462 let functions_guard = functions.read().unwrap();
464 if let Some(function) = functions_guard.get(name) {
465 let arg_values: Vec<Value> = args
466 .iter()
467 .map(|arg| {
468 facts
469 .get_nested(arg)
470 .or_else(|| facts.get(arg))
471 .unwrap_or(Value::String(arg.clone()))
472 })
473 .collect();
474
475 match function(&arg_values, facts) {
476 Ok(result_value) => {
477 match result_value {
479 Value::Boolean(b) => b,
480 Value::Integer(i) => i != 0,
481 Value::Number(f) => f != 0.0,
482 Value::String(s) => !s.is_empty(),
483 _ => false,
484 }
485 }
486 Err(_) => false,
487 }
488 } else {
489 false
490 }
491 }
492 ConditionExpression::MultiField {
493 field,
494 operation,
495 variable: _,
496 } => {
497 Self::evaluate_multifield(field, operation, condition, facts)
499 }
500 }
501 }
502
503 fn evaluate_multifield(
505 field: &str,
506 operation: &str,
507 condition: &crate::engine::rule::Condition,
508 facts: &Facts,
509 ) -> bool {
510 if let Some(value) = facts.get_nested(field).or_else(|| facts.get(field)) {
511 match value {
512 Value::Array(items) => {
513 match operation {
514 "empty" => items.is_empty(),
515 "not_empty" => !items.is_empty(),
516 "count" => {
517 let count = Value::Integer(items.len() as i64);
518 condition.operator.evaluate(&count, &condition.value)
519 }
520 "first" => {
521 if let Some(first) = items.first() {
522 condition.operator.evaluate(first, &condition.value)
523 } else {
524 false
525 }
526 }
527 "last" => {
528 if let Some(last) = items.last() {
529 condition.operator.evaluate(last, &condition.value)
530 } else {
531 false
532 }
533 }
534 "contains" => items
535 .iter()
536 .any(|item| condition.operator.evaluate(item, &condition.value)),
537 "collect" => {
538 true
540 }
541 _ => false,
542 }
543 }
544 _ => false,
545 }
546 } else {
547 false
548 }
549 }
550
551 fn evaluate_accumulate_parallel(
553 result_var: &str,
554 source_pattern: &str,
555 extract_field: &str,
556 source_conditions: &[String],
557 function: &str,
558 _function_arg: &str,
559 facts: &Facts,
560 ) -> Result<()> {
561 let all_facts = facts.get_all_facts();
563 let mut matching_values = Vec::new();
564
565 let pattern_prefix = format!("{}.", source_pattern);
566
567 let mut instances: HashMap<String, HashMap<String, Value>> = HashMap::new();
569
570 for (key, value) in &all_facts {
571 if key.starts_with(&pattern_prefix) {
572 let parts: Vec<&str> = key
573 .strip_prefix(&pattern_prefix)
574 .unwrap()
575 .split('.')
576 .collect();
577
578 if parts.len() >= 2 {
579 let instance_id = parts[0];
580 let field_name = parts[1..].join(".");
581
582 instances
583 .entry(instance_id.to_string())
584 .or_default()
585 .insert(field_name, value.clone());
586 } else if parts.len() == 1 {
587 instances
588 .entry("default".to_string())
589 .or_default()
590 .insert(parts[0].to_string(), value.clone());
591 }
592 }
593 }
594
595 for (_instance_id, fields) in instances {
597 let matches_conditions = source_conditions.is_empty() || {
598 source_conditions.iter().all(|_cond| {
599 true })
602 };
603
604 if matches_conditions {
605 if let Some(value) = fields.get(extract_field) {
606 matching_values.push(value.clone());
607 }
608 }
609 }
610
611 let result: Value = match function {
613 "sum" => {
614 let sum: f64 = matching_values
615 .iter()
616 .filter_map(|v| match v {
617 Value::Integer(i) => Some(*i as f64),
618 Value::Number(n) => Some(*n),
619 _ => None,
620 })
621 .sum();
622 Value::Number(sum)
623 }
624 "average" | "avg" => {
625 let values: Vec<f64> = matching_values
626 .iter()
627 .filter_map(|v| match v {
628 Value::Integer(i) => Some(*i as f64),
629 Value::Number(n) => Some(*n),
630 _ => None,
631 })
632 .collect();
633 if values.is_empty() {
634 Value::Number(0.0)
635 } else {
636 Value::Number(values.iter().sum::<f64>() / values.len() as f64)
637 }
638 }
639 "min" => {
640 let min = matching_values
641 .iter()
642 .filter_map(|v| match v {
643 Value::Integer(i) => Some(*i as f64),
644 Value::Number(n) => Some(*n),
645 _ => None,
646 })
647 .fold(f64::INFINITY, f64::min);
648 Value::Number(min)
649 }
650 "max" => {
651 let max = matching_values
652 .iter()
653 .filter_map(|v| match v {
654 Value::Integer(i) => Some(*i as f64),
655 Value::Number(n) => Some(*n),
656 _ => None,
657 })
658 .fold(f64::NEG_INFINITY, f64::max);
659 Value::Number(max)
660 }
661 "count" => Value::Integer(matching_values.len() as i64),
662 "collect" => Value::Array(matching_values.clone()),
663 _ => Value::Integer(0),
664 };
665
666 facts.set(result_var, result);
668 Ok(())
669 }
670
671 fn execute_action_parallel(
673 action: &ActionType,
674 facts: &Facts,
675 functions: &Arc<RwLock<CustomFunctionMap>>,
676 ) -> Result<()> {
677 match action {
678 ActionType::Custom { action_type, .. } => {
679 let functions_guard = functions.read().unwrap();
681 if let Some(func) = functions_guard.get(action_type) {
682 let empty_args = Vec::new();
683 let _result = func(&empty_args, facts)?;
684 }
685 Ok(())
686 }
687 ActionType::MethodCall { .. } => {
688 Ok(())
690 }
691 ActionType::Set { .. } => {
692 Ok(())
694 }
695 ActionType::Log { message } => {
696 println!(" ๐ {}", message);
697 Ok(())
698 }
699 ActionType::Retract { .. } => {
700 Ok(())
702 }
703 ActionType::ActivateAgendaGroup { .. } => {
704 Ok(())
706 }
707 ActionType::ScheduleRule { .. } => {
708 Ok(())
710 }
711 ActionType::CompleteWorkflow { .. } => {
712 Ok(())
714 }
715 ActionType::SetWorkflowData { .. } => {
716 Ok(())
718 }
719 }
720 }
721
722 fn calculate_speedup(&self, contexts: &[RuleExecutionContext]) -> f64 {
724 if contexts.is_empty() {
725 return 1.0;
726 }
727
728 let total_time: Duration = contexts.iter().map(|c| c.execution_time).sum();
729 let max_time = contexts
730 .iter()
731 .map(|c| c.execution_time)
732 .max()
733 .unwrap_or(Duration::ZERO);
734
735 if max_time.as_nanos() > 0 {
736 total_time.as_nanos() as f64 / max_time.as_nanos() as f64
737 } else {
738 1.0
739 }
740 }
741}
742
743#[derive(Debug)]
745pub struct ParallelExecutionResult {
746 pub total_rules_evaluated: usize,
748 pub total_rules_fired: usize,
750 pub execution_time: Duration,
752 pub execution_contexts: Vec<RuleExecutionContext>,
754 pub parallel_speedup: f64,
756}
757
758impl ParallelExecutionResult {
759 pub fn get_stats(&self) -> String {
761 format!(
762 "๐ Parallel Execution Stats:\n Rules evaluated: {}\n Rules fired: {}\n Execution time: {:?}\n Parallel speedup: {:.2}x",
763 self.total_rules_evaluated,
764 self.total_rules_fired,
765 self.execution_time,
766 self.parallel_speedup
767 )
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774 use crate::engine::rule::{Condition, ConditionGroup};
775 use crate::types::{Operator, Value};
776
777 #[test]
778 fn test_parallel_config_default() {
779 let config = ParallelConfig::default();
780 assert!(config.enabled);
781 assert!(config.max_threads > 0);
782 assert_eq!(config.min_rules_per_thread, 2);
783 }
784
785 #[test]
786 fn test_parallel_engine_creation() {
787 let config = ParallelConfig::default();
788 let engine = ParallelRuleEngine::new(config);
789 assert!(engine.custom_functions.read().unwrap().is_empty());
790 }
791
792 #[test]
793 fn test_salience_grouping() {
794 let config = ParallelConfig::default();
795 let engine = ParallelRuleEngine::new(config);
796
797 let rules = vec![
798 Rule::new(
799 "Rule1".to_string(),
800 ConditionGroup::Single(Condition::new(
801 "test".to_string(),
802 Operator::Equal,
803 Value::Boolean(true),
804 )),
805 vec![],
806 )
807 .with_priority(10),
808 Rule::new(
809 "Rule2".to_string(),
810 ConditionGroup::Single(Condition::new(
811 "test".to_string(),
812 Operator::Equal,
813 Value::Boolean(true),
814 )),
815 vec![],
816 )
817 .with_priority(10),
818 Rule::new(
819 "Rule3".to_string(),
820 ConditionGroup::Single(Condition::new(
821 "test".to_string(),
822 Operator::Equal,
823 Value::Boolean(true),
824 )),
825 vec![],
826 )
827 .with_priority(5),
828 ];
829
830 let groups = engine.group_rules_by_salience(&rules);
831 assert_eq!(groups.len(), 2);
832 assert_eq!(groups[&10].len(), 2);
833 assert_eq!(groups[&5].len(), 1);
834 }
835}