1#![allow(clippy::collapsible_match)]
2
3use crate::engine::{facts::Facts, knowledge_base::KnowledgeBase, rule::Rule};
4use crate::errors::{Result, RuleEngineError};
5use crate::types::{ActionType, Value};
6use std::collections::HashMap;
7use std::sync::{Arc, Mutex, RwLock};
8use std::thread;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone)]
13pub struct ParallelConfig {
14 pub enabled: bool,
16 pub max_threads: usize,
18 pub min_rules_per_thread: usize,
20 pub dependency_analysis: bool,
22}
23
24impl Default for ParallelConfig {
25 fn default() -> Self {
26 Self {
27 enabled: true,
28 max_threads: num_cpus::get(),
29 min_rules_per_thread: 2,
30 dependency_analysis: true,
31 }
32 }
33}
34
35type CustomFunctionMap =
37 HashMap<String, Box<dyn Fn(&[Value], &Facts) -> Result<Value> + Send + Sync>>;
38
39#[derive(Debug, Clone)]
41pub struct RuleExecutionContext {
42 pub rule: Rule,
44 pub fired: bool,
46 pub error: Option<String>,
48 pub execution_time: Duration,
50}
51
52pub struct ParallelRuleEngine {
54 config: ParallelConfig,
55 custom_functions: Arc<RwLock<CustomFunctionMap>>,
56}
57
58impl ParallelRuleEngine {
59 pub fn new(config: ParallelConfig) -> Self {
61 Self {
62 config,
63 custom_functions: Arc::new(RwLock::new(HashMap::new())),
64 }
65 }
66
67 pub fn register_function<F>(&mut self, name: &str, func: F)
69 where
70 F: Fn(&[Value], &Facts) -> Result<Value> + Send + Sync + 'static,
71 {
72 let mut functions = self.custom_functions.write().unwrap();
73 functions.insert(name.to_string(), Box::new(func));
74 }
75
76 pub fn execute_parallel(
78 &self,
79 knowledge_base: &KnowledgeBase,
80 facts: &Facts,
81 debug_mode: bool,
82 ) -> Result<ParallelExecutionResult> {
83 let start_time = Instant::now();
84
85 if debug_mode {
86 println!(
87 "๐ Starting parallel rule execution with {} rules",
88 knowledge_base.get_rules().len()
89 );
90 }
91
92 let salience_groups = self.group_rules_by_salience(&knowledge_base.get_rules());
94
95 let mut total_fired = 0;
96 let mut total_evaluated = 0;
97 let mut execution_contexts = Vec::new();
98
99 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
101 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
104 let rules_at_level = &salience_groups[&salience];
105
106 if debug_mode {
107 println!(
108 "โก Processing {} rules at salience level {}",
109 rules_at_level.len(),
110 salience
111 );
112 }
113
114 let should_parallelize = self.should_parallelize(rules_at_level);
116
117 let contexts = if should_parallelize {
118 self.execute_rules_parallel(rules_at_level, facts, debug_mode)?
119 } else {
120 self.execute_rules_sequential(rules_at_level, facts, debug_mode)?
121 };
122
123 for context in &contexts {
125 total_evaluated += 1;
126 if context.fired {
127 total_fired += 1;
128 }
129 }
130
131 execution_contexts.extend(contexts);
132 }
133
134 Ok(ParallelExecutionResult {
135 total_rules_evaluated: total_evaluated,
136 total_rules_fired: total_fired,
137 execution_time: start_time.elapsed(),
138 parallel_speedup: self.calculate_speedup(&execution_contexts),
139 execution_contexts,
140 })
141 }
142
143 fn group_rules_by_salience(&self, rules: &[Rule]) -> HashMap<i32, Vec<Rule>> {
145 let mut groups: HashMap<i32, Vec<Rule>> = HashMap::new();
146 for rule in rules {
147 if rule.enabled {
148 groups.entry(rule.salience).or_default().push(rule.clone());
149 }
150 }
151 groups
152 }
153
154 fn should_parallelize(&self, rules: &[Rule]) -> bool {
156 self.config.enabled && rules.len() >= self.config.min_rules_per_thread && rules.len() >= 2
157 }
158
159 fn execute_rules_parallel(
161 &self,
162 rules: &[Rule],
163 facts: &Facts,
164 debug_mode: bool,
165 ) -> Result<Vec<RuleExecutionContext>> {
166 let results = Arc::new(Mutex::new(Vec::new()));
167 let facts_arc = Arc::new(facts.clone());
168 let functions_arc = Arc::clone(&self.custom_functions);
169
170 let chunk_size = rules.len().div_ceil(self.config.max_threads);
172 let chunks: Vec<_> = rules.chunks(chunk_size).collect();
173
174 let handles: Vec<_> = chunks
175 .into_iter()
176 .enumerate()
177 .map(|(thread_id, chunk)| {
178 let chunk = chunk.to_vec();
179 let results_clone = Arc::clone(&results);
180 let facts_clone = Arc::clone(&facts_arc);
181 let functions_clone = Arc::clone(&functions_arc);
182
183 thread::spawn(move || {
184 if debug_mode {
185 println!(" ๐งต Thread {} processing {} rules", thread_id, chunk.len());
186 }
187
188 let mut thread_results = Vec::new();
189 for rule in chunk {
190 let start = Instant::now();
191 let fired =
193 Self::evaluate_rule_conditions(&rule, &facts_clone, &functions_clone);
194
195 if fired {
196 if debug_mode {
197 println!(" ๐ฅ Rule '{}' fired", rule.name);
198 }
199
200 for action in &rule.actions {
202 if let Err(e) = Self::execute_action_parallel(
203 action,
204 &facts_clone,
205 &functions_clone,
206 ) {
207 if debug_mode {
208 println!(" โ Action failed: {}", e);
209 }
210 }
211 }
212 }
213
214 thread_results.push(RuleExecutionContext {
215 rule: rule.clone(),
216 fired,
217 error: None,
218 execution_time: start.elapsed(),
219 });
220 }
221
222 let mut results = results_clone.lock().unwrap();
223 results.extend(thread_results);
224 })
225 })
226 .collect();
227
228 for handle in handles {
230 handle
231 .join()
232 .map_err(|_| RuleEngineError::EvaluationError {
233 message: "Thread panicked during parallel execution".to_string(),
234 })?;
235 }
236
237 let results = results.lock().unwrap();
238 Ok(results.clone())
239 }
240
241 fn execute_rules_sequential(
243 &self,
244 rules: &[Rule],
245 facts: &Facts,
246 debug_mode: bool,
247 ) -> Result<Vec<RuleExecutionContext>> {
248 let mut contexts = Vec::new();
249 let functions_arc = Arc::clone(&self.custom_functions);
250
251 for rule in rules {
252 let start = Instant::now();
253 let fired = Self::evaluate_rule_conditions(rule, facts, &functions_arc);
254
255 if fired && debug_mode {
256 println!(" ๐ฅ Rule '{}' fired", rule.name);
257 }
258
259 if fired {
260 for action in &rule.actions {
262 if let Err(e) = Self::execute_action_parallel(action, facts, &functions_arc) {
263 if debug_mode {
264 println!(" โ Action failed: {}", e);
265 }
266 }
267 }
268 }
269
270 contexts.push(RuleExecutionContext {
271 rule: rule.clone(),
272 fired,
273 error: None,
274 execution_time: start.elapsed(),
275 });
276 }
277
278 Ok(contexts)
279 }
280
281 fn evaluate_rule_conditions(
295 rule: &Rule,
296 facts: &Facts,
297 functions: &Arc<RwLock<CustomFunctionMap>>,
298 ) -> bool {
299 use crate::engine::pattern_matcher::PatternMatcher;
300 use crate::engine::rule::ConditionGroup;
301
302 match &rule.conditions {
303 ConditionGroup::Single(condition) => {
304 Self::evaluate_single_condition(condition, facts, functions)
305 }
306 ConditionGroup::Compound {
307 left,
308 operator,
309 right,
310 } => {
311 let left_rule = Rule {
313 name: rule.name.clone(),
314 description: rule.description.clone(),
315 conditions: (**left).clone(),
316 actions: rule.actions.clone(),
317 salience: rule.salience,
318 enabled: rule.enabled,
319 no_loop: rule.no_loop,
320 lock_on_active: rule.lock_on_active,
321 agenda_group: rule.agenda_group.clone(),
322 activation_group: rule.activation_group.clone(),
323 date_effective: rule.date_effective,
324 date_expires: rule.date_expires,
325 };
326 let right_rule = Rule {
327 name: rule.name.clone(),
328 description: rule.description.clone(),
329 conditions: (**right).clone(),
330 actions: rule.actions.clone(),
331 salience: rule.salience,
332 enabled: rule.enabled,
333 no_loop: rule.no_loop,
334 lock_on_active: rule.lock_on_active,
335 agenda_group: rule.agenda_group.clone(),
336 activation_group: rule.activation_group.clone(),
337 date_effective: rule.date_effective,
338 date_expires: rule.date_expires,
339 };
340
341 let left_result = Self::evaluate_rule_conditions(&left_rule, facts, functions);
342 let right_result = Self::evaluate_rule_conditions(&right_rule, facts, functions);
343
344 match operator {
345 crate::types::LogicalOperator::And => left_result && right_result,
346 crate::types::LogicalOperator::Or => left_result || right_result,
347 crate::types::LogicalOperator::Not => false, }
349 }
350 ConditionGroup::Not(condition) => {
351 let temp_rule = Rule {
352 name: rule.name.clone(),
353 description: rule.description.clone(),
354 conditions: (**condition).clone(),
355 actions: rule.actions.clone(),
356 salience: rule.salience,
357 enabled: rule.enabled,
358 no_loop: rule.no_loop,
359 lock_on_active: rule.lock_on_active,
360 agenda_group: rule.agenda_group.clone(),
361 activation_group: rule.activation_group.clone(),
362 date_effective: rule.date_effective,
363 date_expires: rule.date_expires,
364 };
365 !Self::evaluate_rule_conditions(&temp_rule, facts, functions)
366 }
367 ConditionGroup::Exists(condition) => PatternMatcher::evaluate_exists(condition, facts),
369 ConditionGroup::Forall(condition) => PatternMatcher::evaluate_forall(condition, facts),
370 ConditionGroup::Accumulate {
372 result_var,
373 source_pattern,
374 extract_field,
375 source_conditions,
376 function,
377 function_arg,
378 } => {
379 Self::evaluate_accumulate_parallel(
381 result_var,
382 source_pattern,
383 extract_field,
384 source_conditions,
385 function,
386 function_arg,
387 facts,
388 )
389 .is_ok()
390 }
391
392 #[cfg(feature = "streaming")]
393 ConditionGroup::StreamPattern { .. } => {
394 true
397 }
398 }
399 }
400
401 fn evaluate_single_condition(
403 condition: &crate::engine::rule::Condition,
404 facts: &Facts,
405 functions: &Arc<RwLock<CustomFunctionMap>>,
406 ) -> bool {
407 use crate::engine::rule::ConditionExpression;
408
409 match &condition.expression {
410 ConditionExpression::Field(field_name) => {
411 if let Some(value) = facts
413 .get_nested(field_name)
414 .or_else(|| facts.get(field_name))
415 {
416 let rhs = match &condition.value {
418 Value::String(s) => {
419 facts
421 .get_nested(s)
422 .or_else(|| facts.get(s))
423 .unwrap_or(condition.value.clone())
424 }
425 Value::Expression(expr) => {
426 match crate::expression::evaluate_expression(expr, facts) {
428 Ok(evaluated) => evaluated,
429 Err(_) => facts
430 .get_nested(expr)
431 .or_else(|| facts.get(expr))
432 .unwrap_or(condition.value.clone()),
433 }
434 }
435 _ => condition.value.clone(),
436 };
437 condition.operator.evaluate(&value, &rhs)
438 } else {
439 false
440 }
441 }
442 ConditionExpression::FunctionCall { name, args } => {
443 let functions_guard = functions.read().unwrap();
445 if let Some(function) = functions_guard.get(name) {
446 let arg_values: Vec<Value> = args
448 .iter()
449 .map(|arg| {
450 facts
451 .get_nested(arg)
452 .or_else(|| facts.get(arg))
453 .unwrap_or(Value::String(arg.clone()))
454 })
455 .collect();
456
457 match function(&arg_values, facts) {
459 Ok(result_value) => {
460 condition.operator.evaluate(&result_value, &condition.value)
461 }
462 Err(_) => false,
463 }
464 } else {
465 false
466 }
467 }
468 ConditionExpression::Test { name, args } => {
469 let functions_guard = functions.read().unwrap();
471 if let Some(function) = functions_guard.get(name) {
472 let arg_values: Vec<Value> = args
473 .iter()
474 .map(|arg| {
475 facts
476 .get_nested(arg)
477 .or_else(|| facts.get(arg))
478 .unwrap_or(Value::String(arg.clone()))
479 })
480 .collect();
481
482 match function(&arg_values, facts) {
483 Ok(result_value) => {
484 match result_value {
486 Value::Boolean(b) => b,
487 Value::Integer(i) => i != 0,
488 Value::Number(f) => f != 0.0,
489 Value::String(s) => !s.is_empty(),
490 _ => false,
491 }
492 }
493 Err(_) => false,
494 }
495 } else {
496 false
497 }
498 }
499 ConditionExpression::MultiField {
500 field,
501 operation,
502 variable: _,
503 } => {
504 Self::evaluate_multifield(field, operation, condition, facts)
506 }
507 }
508 }
509
510 fn evaluate_multifield(
512 field: &str,
513 operation: &str,
514 condition: &crate::engine::rule::Condition,
515 facts: &Facts,
516 ) -> bool {
517 if let Some(value) = facts.get_nested(field).or_else(|| facts.get(field)) {
518 match value {
519 Value::Array(items) => {
520 match operation {
521 "empty" => items.is_empty(),
522 "not_empty" => !items.is_empty(),
523 "count" => {
524 let count = Value::Integer(items.len() as i64);
525 condition.operator.evaluate(&count, &condition.value)
526 }
527 "first" => {
528 if let Some(first) = items.first() {
529 condition.operator.evaluate(first, &condition.value)
530 } else {
531 false
532 }
533 }
534 "last" => {
535 if let Some(last) = items.last() {
536 condition.operator.evaluate(last, &condition.value)
537 } else {
538 false
539 }
540 }
541 "contains" => items
542 .iter()
543 .any(|item| condition.operator.evaluate(item, &condition.value)),
544 "collect" => {
545 true
547 }
548 _ => false,
549 }
550 }
551 _ => false,
552 }
553 } else {
554 false
555 }
556 }
557
558 fn evaluate_accumulate_parallel(
560 result_var: &str,
561 source_pattern: &str,
562 extract_field: &str,
563 source_conditions: &[String],
564 function: &str,
565 _function_arg: &str,
566 facts: &Facts,
567 ) -> Result<()> {
568 let all_facts = facts.get_all_facts();
570 let mut matching_values = Vec::new();
571
572 let pattern_prefix = format!("{}.", source_pattern);
573
574 let mut instances: HashMap<String, HashMap<String, Value>> = HashMap::new();
576
577 for (key, value) in &all_facts {
578 if key.starts_with(&pattern_prefix) {
579 let parts: Vec<&str> = key
580 .strip_prefix(&pattern_prefix)
581 .unwrap()
582 .split('.')
583 .collect();
584
585 if parts.len() >= 2 {
586 let instance_id = parts[0];
587 let field_name = parts[1..].join(".");
588
589 instances
590 .entry(instance_id.to_string())
591 .or_default()
592 .insert(field_name, value.clone());
593 } else if parts.len() == 1 {
594 instances
595 .entry("default".to_string())
596 .or_default()
597 .insert(parts[0].to_string(), value.clone());
598 }
599 }
600 }
601
602 for (_instance_id, fields) in instances {
604 let matches_conditions = source_conditions.is_empty() || {
605 source_conditions.iter().all(|_cond| {
606 true })
609 };
610
611 if matches_conditions {
612 if let Some(value) = fields.get(extract_field) {
613 matching_values.push(value.clone());
614 }
615 }
616 }
617
618 let result: Value = match function {
620 "sum" => {
621 let sum: f64 = matching_values
622 .iter()
623 .filter_map(|v| match v {
624 Value::Integer(i) => Some(*i as f64),
625 Value::Number(n) => Some(*n),
626 _ => None,
627 })
628 .sum();
629 Value::Number(sum)
630 }
631 "average" | "avg" => {
632 let values: Vec<f64> = matching_values
633 .iter()
634 .filter_map(|v| match v {
635 Value::Integer(i) => Some(*i as f64),
636 Value::Number(n) => Some(*n),
637 _ => None,
638 })
639 .collect();
640 if values.is_empty() {
641 Value::Number(0.0)
642 } else {
643 Value::Number(values.iter().sum::<f64>() / values.len() as f64)
644 }
645 }
646 "min" => {
647 let min = matching_values
648 .iter()
649 .filter_map(|v| match v {
650 Value::Integer(i) => Some(*i as f64),
651 Value::Number(n) => Some(*n),
652 _ => None,
653 })
654 .fold(f64::INFINITY, f64::min);
655 Value::Number(min)
656 }
657 "max" => {
658 let max = matching_values
659 .iter()
660 .filter_map(|v| match v {
661 Value::Integer(i) => Some(*i as f64),
662 Value::Number(n) => Some(*n),
663 _ => None,
664 })
665 .fold(f64::NEG_INFINITY, f64::max);
666 Value::Number(max)
667 }
668 "count" => Value::Integer(matching_values.len() as i64),
669 "collect" => Value::Array(matching_values.clone()),
670 _ => Value::Integer(0),
671 };
672
673 facts.set(result_var, result);
675 Ok(())
676 }
677
678 fn execute_action_parallel(
680 action: &ActionType,
681 facts: &Facts,
682 functions: &Arc<RwLock<CustomFunctionMap>>,
683 ) -> Result<()> {
684 match action {
685 ActionType::Custom { action_type, .. } => {
686 let functions_guard = functions.read().unwrap();
688 if let Some(func) = functions_guard.get(action_type) {
689 let empty_args = Vec::new();
690 let _result = func(&empty_args, facts)?;
691 }
692 Ok(())
693 }
694 ActionType::MethodCall { .. } => {
695 Ok(())
697 }
698 ActionType::Set { .. } => {
699 Ok(())
701 }
702 ActionType::Log { message } => {
703 println!(" ๐ {}", message);
704 Ok(())
705 }
706 ActionType::Retract { .. } => {
707 Ok(())
709 }
710 ActionType::ActivateAgendaGroup { .. } => {
711 Ok(())
713 }
714 ActionType::ScheduleRule { .. } => {
715 Ok(())
717 }
718 ActionType::CompleteWorkflow { .. } => {
719 Ok(())
721 }
722 ActionType::SetWorkflowData { .. } => {
723 Ok(())
725 }
726 }
727 }
728
729 fn calculate_speedup(&self, contexts: &[RuleExecutionContext]) -> f64 {
731 if contexts.is_empty() {
732 return 1.0;
733 }
734
735 let total_time: Duration = contexts.iter().map(|c| c.execution_time).sum();
736 let max_time = contexts
737 .iter()
738 .map(|c| c.execution_time)
739 .max()
740 .unwrap_or(Duration::ZERO);
741
742 if max_time.as_nanos() > 0 {
743 total_time.as_nanos() as f64 / max_time.as_nanos() as f64
744 } else {
745 1.0
746 }
747 }
748}
749
750#[derive(Debug)]
752pub struct ParallelExecutionResult {
753 pub total_rules_evaluated: usize,
755 pub total_rules_fired: usize,
757 pub execution_time: Duration,
759 pub execution_contexts: Vec<RuleExecutionContext>,
761 pub parallel_speedup: f64,
763}
764
765impl ParallelExecutionResult {
766 pub fn get_stats(&self) -> String {
768 format!(
769 "๐ Parallel Execution Stats:\n Rules evaluated: {}\n Rules fired: {}\n Execution time: {:?}\n Parallel speedup: {:.2}x",
770 self.total_rules_evaluated,
771 self.total_rules_fired,
772 self.execution_time,
773 self.parallel_speedup
774 )
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781 use crate::engine::rule::{Condition, ConditionGroup};
782 use crate::types::{Operator, Value};
783
784 #[test]
785 fn test_parallel_config_default() {
786 let config = ParallelConfig::default();
787 assert!(config.enabled);
788 assert!(config.max_threads > 0);
789 assert_eq!(config.min_rules_per_thread, 2);
790 }
791
792 #[test]
793 fn test_parallel_engine_creation() {
794 let config = ParallelConfig::default();
795 let engine = ParallelRuleEngine::new(config);
796 assert!(engine.custom_functions.read().unwrap().is_empty());
797 }
798
799 #[test]
800 fn test_salience_grouping() {
801 let config = ParallelConfig::default();
802 let engine = ParallelRuleEngine::new(config);
803
804 let rules = vec![
805 Rule::new(
806 "Rule1".to_string(),
807 ConditionGroup::Single(Condition::new(
808 "test".to_string(),
809 Operator::Equal,
810 Value::Boolean(true),
811 )),
812 vec![],
813 )
814 .with_priority(10),
815 Rule::new(
816 "Rule2".to_string(),
817 ConditionGroup::Single(Condition::new(
818 "test".to_string(),
819 Operator::Equal,
820 Value::Boolean(true),
821 )),
822 vec![],
823 )
824 .with_priority(10),
825 Rule::new(
826 "Rule3".to_string(),
827 ConditionGroup::Single(Condition::new(
828 "test".to_string(),
829 Operator::Equal,
830 Value::Boolean(true),
831 )),
832 vec![],
833 )
834 .with_priority(5),
835 ];
836
837 let groups = engine.group_rules_by_salience(&rules);
838 assert_eq!(groups.len(), 2);
839 assert_eq!(groups[&10].len(), 2);
840 assert_eq!(groups[&5].len(), 1);
841 }
842}