1use crate::engine::{facts::Facts, knowledge_base::KnowledgeBase, rule::Rule};
2use crate::errors::{Result, RuleEngineError};
3use crate::types::{ActionType, Value};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::thread;
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12 pub enabled: bool,
14 pub max_threads: usize,
16 pub min_rules_per_thread: usize,
18 pub dependency_analysis: bool,
20}
21
22impl Default for ParallelConfig {
23 fn default() -> Self {
24 Self {
25 enabled: true,
26 max_threads: num_cpus::get(),
27 min_rules_per_thread: 2,
28 dependency_analysis: true,
29 }
30 }
31}
32
33type CustomFunctionMap =
35 HashMap<String, Box<dyn Fn(&[Value], &Facts) -> Result<Value> + Send + Sync>>;
36
37#[derive(Debug, Clone)]
39pub struct RuleExecutionContext {
40 pub rule: Rule,
42 pub fired: bool,
44 pub error: Option<String>,
46 pub execution_time: Duration,
48}
49
50pub struct ParallelRuleEngine {
52 config: ParallelConfig,
53 custom_functions: Arc<RwLock<CustomFunctionMap>>,
54}
55
56impl ParallelRuleEngine {
57 pub fn new(config: ParallelConfig) -> Self {
59 Self {
60 config,
61 custom_functions: Arc::new(RwLock::new(HashMap::new())),
62 }
63 }
64
65 pub fn register_function<F>(&mut self, name: &str, func: F)
67 where
68 F: Fn(&[Value], &Facts) -> Result<Value> + Send + Sync + 'static,
69 {
70 let mut functions = self.custom_functions.write().unwrap();
71 functions.insert(name.to_string(), Box::new(func));
72 }
73
74 pub fn execute_parallel(
76 &self,
77 knowledge_base: &KnowledgeBase,
78 facts: &Facts,
79 debug_mode: bool,
80 ) -> Result<ParallelExecutionResult> {
81 let start_time = Instant::now();
82
83 if debug_mode {
84 println!(
85 "๐ Starting parallel rule execution with {} rules",
86 knowledge_base.get_rules().len()
87 );
88 }
89
90 let salience_groups = self.group_rules_by_salience(&knowledge_base.get_rules());
92
93 let mut total_fired = 0;
94 let mut total_evaluated = 0;
95 let mut execution_contexts = Vec::new();
96
97 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
99 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
102 let rules_at_level = &salience_groups[&salience];
103
104 if debug_mode {
105 println!(
106 "โก Processing {} rules at salience level {}",
107 rules_at_level.len(),
108 salience
109 );
110 }
111
112 let should_parallelize = self.should_parallelize(rules_at_level);
114
115 let contexts = if should_parallelize {
116 self.execute_rules_parallel(rules_at_level, facts, debug_mode)?
117 } else {
118 self.execute_rules_sequential(rules_at_level, facts, debug_mode)?
119 };
120
121 for context in &contexts {
123 total_evaluated += 1;
124 if context.fired {
125 total_fired += 1;
126 }
127 }
128
129 execution_contexts.extend(contexts);
130 }
131
132 Ok(ParallelExecutionResult {
133 total_rules_evaluated: total_evaluated,
134 total_rules_fired: total_fired,
135 execution_time: start_time.elapsed(),
136 parallel_speedup: self.calculate_speedup(&execution_contexts),
137 execution_contexts,
138 })
139 }
140
141 fn group_rules_by_salience(&self, rules: &[Rule]) -> HashMap<i32, Vec<Rule>> {
143 let mut groups = HashMap::new();
144 for rule in rules {
145 if rule.enabled {
146 groups
147 .entry(rule.salience)
148 .or_insert_with(Vec::new)
149 .push(rule.clone());
150 }
151 }
152 groups
153 }
154
155 fn should_parallelize(&self, rules: &[Rule]) -> bool {
157 self.config.enabled && rules.len() >= self.config.min_rules_per_thread && rules.len() >= 2
158 }
159
160 fn execute_rules_parallel(
162 &self,
163 rules: &[Rule],
164 facts: &Facts,
165 debug_mode: bool,
166 ) -> Result<Vec<RuleExecutionContext>> {
167 let results = Arc::new(Mutex::new(Vec::new()));
168 let facts_arc = Arc::new(facts.clone());
169 let functions_arc = Arc::clone(&self.custom_functions);
170
171 let chunk_size = rules.len().div_ceil(self.config.max_threads);
173 let chunks: Vec<_> = rules.chunks(chunk_size).collect();
174
175 let handles: Vec<_> = chunks
176 .into_iter()
177 .enumerate()
178 .map(|(thread_id, chunk)| {
179 let chunk = chunk.to_vec();
180 let results_clone = Arc::clone(&results);
181 let facts_clone = Arc::clone(&facts_arc);
182 let functions_clone = Arc::clone(&functions_arc);
183
184 thread::spawn(move || {
185 if debug_mode {
186 println!(" ๐งต Thread {} processing {} rules", thread_id, chunk.len());
187 }
188
189 let mut thread_results = Vec::new();
190 for rule in chunk {
191 let start = Instant::now();
192 let fired = Self::evaluate_rule_conditions(&rule, &facts_clone, &functions_clone);
194
195 if fired {
196 if debug_mode {
197 println!(" ๐ฅ Rule '{}' fired", rule.name);
198 }
199
200 for action in &rule.actions {
202 if let Err(e) = Self::execute_action_parallel(
203 action,
204 &facts_clone,
205 &functions_clone,
206 ) {
207 if debug_mode {
208 println!(" โ Action failed: {}", e);
209 }
210 }
211 }
212 }
213
214 thread_results.push(RuleExecutionContext {
215 rule: rule.clone(),
216 fired,
217 error: None,
218 execution_time: start.elapsed(),
219 });
220 }
221
222 let mut results = results_clone.lock().unwrap();
223 results.extend(thread_results);
224 })
225 })
226 .collect();
227
228 for handle in handles {
230 handle
231 .join()
232 .map_err(|_| RuleEngineError::EvaluationError {
233 message: "Thread panicked during parallel execution".to_string(),
234 })?;
235 }
236
237 let results = results.lock().unwrap();
238 Ok(results.clone())
239 }
240
241 fn execute_rules_sequential(
243 &self,
244 rules: &[Rule],
245 facts: &Facts,
246 debug_mode: bool,
247 ) -> Result<Vec<RuleExecutionContext>> {
248 let mut contexts = Vec::new();
249 let functions_arc = Arc::clone(&self.custom_functions);
250
251 for rule in rules {
252 let start = Instant::now();
253 let fired = Self::evaluate_rule_conditions(rule, facts, &functions_arc);
254
255 if fired && debug_mode {
256 println!(" ๐ฅ Rule '{}' fired", rule.name);
257 }
258
259 if fired {
260 for action in &rule.actions {
262 if let Err(e) = Self::execute_action_parallel(action, facts, &functions_arc) {
263 if debug_mode {
264 println!(" โ Action failed: {}", e);
265 }
266 }
267 }
268 }
269
270 contexts.push(RuleExecutionContext {
271 rule: rule.clone(),
272 fired,
273 error: None,
274 execution_time: start.elapsed(),
275 });
276 }
277
278 Ok(contexts)
279 }
280
281 fn evaluate_rule_conditions(
295 rule: &Rule,
296 facts: &Facts,
297 functions: &Arc<RwLock<CustomFunctionMap>>,
298 ) -> bool {
299 use crate::engine::rule::{ConditionExpression, ConditionGroup};
300 use crate::engine::pattern_matcher::PatternMatcher;
301
302 match &rule.conditions {
303 ConditionGroup::Single(condition) => {
304 Self::evaluate_single_condition(condition, facts, functions)
305 }
306 ConditionGroup::Single(condition) => {
307 Self::evaluate_single_condition(condition, facts, functions)
308 }
309 ConditionGroup::Compound { left, operator, right } => {
310 let left_rule = Rule {
312 name: rule.name.clone(),
313 description: rule.description.clone(),
314 conditions: (**left).clone(),
315 actions: rule.actions.clone(),
316 salience: rule.salience,
317 enabled: rule.enabled,
318 no_loop: rule.no_loop,
319 lock_on_active: rule.lock_on_active,
320 agenda_group: rule.agenda_group.clone(),
321 activation_group: rule.activation_group.clone(),
322 date_effective: rule.date_effective,
323 date_expires: rule.date_expires,
324 };
325 let right_rule = Rule {
326 name: rule.name.clone(),
327 description: rule.description.clone(),
328 conditions: (**right).clone(),
329 actions: rule.actions.clone(),
330 salience: rule.salience,
331 enabled: rule.enabled,
332 no_loop: rule.no_loop,
333 lock_on_active: rule.lock_on_active,
334 agenda_group: rule.agenda_group.clone(),
335 activation_group: rule.activation_group.clone(),
336 date_effective: rule.date_effective,
337 date_expires: rule.date_expires,
338 };
339
340 let left_result = Self::evaluate_rule_conditions(&left_rule, facts, functions);
341 let right_result = Self::evaluate_rule_conditions(&right_rule, facts, functions);
342
343 match operator {
344 crate::types::LogicalOperator::And => left_result && right_result,
345 crate::types::LogicalOperator::Or => left_result || right_result,
346 crate::types::LogicalOperator::Not => false, }
348 }
349 ConditionGroup::Not(condition) => {
350 let temp_rule = Rule {
351 name: rule.name.clone(),
352 description: rule.description.clone(),
353 conditions: (**condition).clone(),
354 actions: rule.actions.clone(),
355 salience: rule.salience,
356 enabled: rule.enabled,
357 no_loop: rule.no_loop,
358 lock_on_active: rule.lock_on_active,
359 agenda_group: rule.agenda_group.clone(),
360 activation_group: rule.activation_group.clone(),
361 date_effective: rule.date_effective,
362 date_expires: rule.date_expires,
363 };
364 !Self::evaluate_rule_conditions(&temp_rule, facts, functions)
365 }
366 ConditionGroup::Exists(condition) => {
368 PatternMatcher::evaluate_exists(condition, facts)
369 }
370 ConditionGroup::Forall(condition) => {
371 PatternMatcher::evaluate_forall(condition, facts)
372 }
373 ConditionGroup::Accumulate {
375 result_var,
376 source_pattern,
377 extract_field,
378 source_conditions,
379 function,
380 function_arg,
381 } => {
382 if let Ok(_) = Self::evaluate_accumulate_parallel(
384 result_var,
385 source_pattern,
386 extract_field,
387 source_conditions,
388 function,
389 function_arg,
390 facts,
391 ) {
392 true
393 } else {
394 false
395 }
396 }
397 }
398 }
399
400 fn evaluate_single_condition(
402 condition: &crate::engine::rule::Condition,
403 facts: &Facts,
404 functions: &Arc<RwLock<CustomFunctionMap>>,
405 ) -> bool {
406 use crate::engine::rule::ConditionExpression;
407
408 match &condition.expression {
409 ConditionExpression::Field(field_name) => {
410 if let Some(value) = facts.get_nested(field_name).or_else(|| facts.get(field_name)) {
412 let rhs = match &condition.value {
414 Value::String(s) => {
415 facts.get_nested(s)
417 .or_else(|| facts.get(s))
418 .unwrap_or(condition.value.clone())
419 }
420 Value::Expression(expr) => {
421 match crate::expression::evaluate_expression(expr, facts) {
423 Ok(evaluated) => evaluated,
424 Err(_) => facts.get_nested(expr)
425 .or_else(|| facts.get(expr))
426 .unwrap_or(condition.value.clone()),
427 }
428 }
429 _ => condition.value.clone(),
430 };
431 condition.operator.evaluate(&value, &rhs)
432 } else {
433 false
434 }
435 }
436 ConditionExpression::FunctionCall { name, args } => {
437 let functions_guard = functions.read().unwrap();
439 if let Some(function) = functions_guard.get(name) {
440 let arg_values: Vec<Value> = args
442 .iter()
443 .map(|arg| {
444 facts.get_nested(arg)
445 .or_else(|| facts.get(arg))
446 .unwrap_or(Value::String(arg.clone()))
447 })
448 .collect();
449
450 match function(&arg_values, facts) {
452 Ok(result_value) => {
453 condition.operator.evaluate(&result_value, &condition.value)
454 }
455 Err(_) => false,
456 }
457 } else {
458 false
459 }
460 }
461 ConditionExpression::Test { name, args } => {
462 let functions_guard = functions.read().unwrap();
464 if let Some(function) = functions_guard.get(name) {
465 let arg_values: Vec<Value> = args
466 .iter()
467 .map(|arg| {
468 facts.get_nested(arg)
469 .or_else(|| facts.get(arg))
470 .unwrap_or(Value::String(arg.clone()))
471 })
472 .collect();
473
474 match function(&arg_values, facts) {
475 Ok(result_value) => {
476 match result_value {
478 Value::Boolean(b) => b,
479 Value::Integer(i) => i != 0,
480 Value::Number(f) => f != 0.0,
481 Value::String(s) => !s.is_empty(),
482 _ => false,
483 }
484 }
485 Err(_) => false,
486 }
487 } else {
488 false
489 }
490 }
491 ConditionExpression::MultiField { field, operation, variable: _ } => {
492 Self::evaluate_multifield(field, operation, condition, facts)
494 }
495 }
496 }
497
498 fn evaluate_multifield(
500 field: &str,
501 operation: &str,
502 condition: &crate::engine::rule::Condition,
503 facts: &Facts,
504 ) -> bool {
505 if let Some(value) = facts.get_nested(field).or_else(|| facts.get(field)) {
506 match value {
507 Value::Array(items) => {
508 match operation {
509 "empty" => items.is_empty(),
510 "not_empty" => !items.is_empty(),
511 "count" => {
512 let count = Value::Integer(items.len() as i64);
513 condition.operator.evaluate(&count, &condition.value)
514 }
515 "first" => {
516 if let Some(first) = items.first() {
517 condition.operator.evaluate(first, &condition.value)
518 } else {
519 false
520 }
521 }
522 "last" => {
523 if let Some(last) = items.last() {
524 condition.operator.evaluate(last, &condition.value)
525 } else {
526 false
527 }
528 }
529 "contains" => {
530 items.iter().any(|item| {
531 condition.operator.evaluate(item, &condition.value)
532 })
533 }
534 "collect" => {
535 true
537 }
538 _ => false,
539 }
540 }
541 _ => false,
542 }
543 } else {
544 false
545 }
546 }
547
548 fn evaluate_accumulate_parallel(
550 result_var: &str,
551 source_pattern: &str,
552 extract_field: &str,
553 source_conditions: &[String],
554 function: &str,
555 function_arg: &str,
556 facts: &Facts,
557 ) -> Result<()> {
558 let all_facts = facts.get_all_facts();
560 let mut matching_values = Vec::new();
561
562 let pattern_prefix = format!("{}.", source_pattern);
563
564 let mut instances: HashMap<String, HashMap<String, Value>> = HashMap::new();
566
567 for (key, value) in &all_facts {
568 if key.starts_with(&pattern_prefix) {
569 let parts: Vec<&str> = key.strip_prefix(&pattern_prefix).unwrap().split('.').collect();
570
571 if parts.len() >= 2 {
572 let instance_id = parts[0];
573 let field_name = parts[1..].join(".");
574
575 instances
576 .entry(instance_id.to_string())
577 .or_insert_with(HashMap::new)
578 .insert(field_name, value.clone());
579 } else if parts.len() == 1 {
580 instances
581 .entry("default".to_string())
582 .or_insert_with(HashMap::new)
583 .insert(parts[0].to_string(), value.clone());
584 }
585 }
586 }
587
588 for (_instance_id, fields) in instances {
590 let matches_conditions = source_conditions.is_empty() || {
591 source_conditions.iter().all(|cond| {
592 true })
595 };
596
597 if matches_conditions {
598 if let Some(value) = fields.get(extract_field) {
599 matching_values.push(value.clone());
600 }
601 }
602 }
603
604 let result: Value = match function {
606 "sum" => {
607 let sum: f64 = matching_values.iter().filter_map(|v| match v {
608 Value::Integer(i) => Some(*i as f64),
609 Value::Number(n) => Some(*n),
610 _ => None,
611 }).sum();
612 Value::Number(sum)
613 }
614 "average" | "avg" => {
615 let values: Vec<f64> = matching_values.iter().filter_map(|v| match v {
616 Value::Integer(i) => Some(*i as f64),
617 Value::Number(n) => Some(*n),
618 _ => None,
619 }).collect();
620 if values.is_empty() {
621 Value::Number(0.0)
622 } else {
623 Value::Number(values.iter().sum::<f64>() / values.len() as f64)
624 }
625 }
626 "min" => {
627 let min = matching_values.iter().filter_map(|v| match v {
628 Value::Integer(i) => Some(*i as f64),
629 Value::Number(n) => Some(*n),
630 _ => None,
631 }).fold(f64::INFINITY, f64::min);
632 Value::Number(min)
633 }
634 "max" => {
635 let max = matching_values.iter().filter_map(|v| match v {
636 Value::Integer(i) => Some(*i as f64),
637 Value::Number(n) => Some(*n),
638 _ => None,
639 }).fold(f64::NEG_INFINITY, f64::max);
640 Value::Number(max)
641 }
642 "count" => {
643 Value::Integer(matching_values.len() as i64)
644 }
645 "collect" => {
646 Value::Array(matching_values.clone())
647 }
648 _ => Value::Integer(0),
649 };
650
651 facts.set(result_var, result);
653 Ok(())
654 }
655
656 fn execute_action_parallel(
658 action: &ActionType,
659 facts: &Facts,
660 functions: &Arc<RwLock<CustomFunctionMap>>,
661 ) -> Result<()> {
662 match action {
663 ActionType::Custom { action_type, .. } => {
664 let functions_guard = functions.read().unwrap();
666 if let Some(func) = functions_guard.get(action_type) {
667 let empty_args = Vec::new();
668 let _result = func(&empty_args, facts)?;
669 }
670 Ok(())
671 }
672 ActionType::MethodCall { .. } => {
673 Ok(())
675 }
676 ActionType::Set { .. } => {
677 Ok(())
679 }
680 ActionType::Log { message } => {
681 println!(" ๐ {}", message);
682 Ok(())
683 }
684 ActionType::Retract { .. } => {
685 Ok(())
687 }
688 ActionType::Custom { .. } => {
689 Ok(())
691 }
692 ActionType::ActivateAgendaGroup { .. } => {
693 Ok(())
695 }
696 ActionType::ScheduleRule { .. } => {
697 Ok(())
699 }
700 ActionType::CompleteWorkflow { .. } => {
701 Ok(())
703 }
704 ActionType::SetWorkflowData { .. } => {
705 Ok(())
707 }
708 }
709 }
710
711 fn calculate_speedup(&self, contexts: &[RuleExecutionContext]) -> f64 {
713 if contexts.is_empty() {
714 return 1.0;
715 }
716
717 let total_time: Duration = contexts.iter().map(|c| c.execution_time).sum();
718 let max_time = contexts
719 .iter()
720 .map(|c| c.execution_time)
721 .max()
722 .unwrap_or(Duration::ZERO);
723
724 if max_time.as_nanos() > 0 {
725 total_time.as_nanos() as f64 / max_time.as_nanos() as f64
726 } else {
727 1.0
728 }
729 }
730}
731
732#[derive(Debug)]
734pub struct ParallelExecutionResult {
735 pub total_rules_evaluated: usize,
737 pub total_rules_fired: usize,
739 pub execution_time: Duration,
741 pub execution_contexts: Vec<RuleExecutionContext>,
743 pub parallel_speedup: f64,
745}
746
747impl ParallelExecutionResult {
748 pub fn get_stats(&self) -> String {
750 format!(
751 "๐ Parallel Execution Stats:\n Rules evaluated: {}\n Rules fired: {}\n Execution time: {:?}\n Parallel speedup: {:.2}x",
752 self.total_rules_evaluated,
753 self.total_rules_fired,
754 self.execution_time,
755 self.parallel_speedup
756 )
757 }
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763 use crate::engine::rule::{Condition, ConditionGroup};
764 use crate::types::{Operator, Value};
765
766 #[test]
767 fn test_parallel_config_default() {
768 let config = ParallelConfig::default();
769 assert!(config.enabled);
770 assert!(config.max_threads > 0);
771 assert_eq!(config.min_rules_per_thread, 2);
772 }
773
774 #[test]
775 fn test_parallel_engine_creation() {
776 let config = ParallelConfig::default();
777 let engine = ParallelRuleEngine::new(config);
778 assert!(engine.custom_functions.read().unwrap().is_empty());
779 }
780
781 #[test]
782 fn test_salience_grouping() {
783 let config = ParallelConfig::default();
784 let engine = ParallelRuleEngine::new(config);
785
786 let rules = vec![
787 Rule::new(
788 "Rule1".to_string(),
789 ConditionGroup::Single(Condition::new(
790 "test".to_string(),
791 Operator::Equal,
792 Value::Boolean(true),
793 )),
794 vec![],
795 )
796 .with_priority(10),
797 Rule::new(
798 "Rule2".to_string(),
799 ConditionGroup::Single(Condition::new(
800 "test".to_string(),
801 Operator::Equal,
802 Value::Boolean(true),
803 )),
804 vec![],
805 )
806 .with_priority(10),
807 Rule::new(
808 "Rule3".to_string(),
809 ConditionGroup::Single(Condition::new(
810 "test".to_string(),
811 Operator::Equal,
812 Value::Boolean(true),
813 )),
814 vec![],
815 )
816 .with_priority(5),
817 ];
818
819 let groups = engine.group_rules_by_salience(&rules);
820 assert_eq!(groups.len(), 2);
821 assert_eq!(groups[&10].len(), 2);
822 assert_eq!(groups[&5].len(), 1);
823 }
824}