rust_rule_engine/engine/
parallel.rs

1use crate::engine::{facts::Facts, knowledge_base::KnowledgeBase, rule::Rule};
2use crate::errors::{Result, RuleEngineError};
3use crate::types::{ActionType, Value};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::thread;
7use std::time::{Duration, Instant};
8
9/// Configuration for parallel rule execution
10#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12    /// Enable parallel execution
13    pub enabled: bool,
14    /// Maximum number of worker threads
15    pub max_threads: usize,
16    /// Minimum rules per thread to justify parallelization
17    pub min_rules_per_thread: usize,
18    /// Enable dependency analysis
19    pub dependency_analysis: bool,
20}
21
22impl Default for ParallelConfig {
23    fn default() -> Self {
24        Self {
25            enabled: true,
26            max_threads: num_cpus::get(),
27            min_rules_per_thread: 2,
28            dependency_analysis: true,
29        }
30    }
31}
32
33/// Type alias for custom function storage
34type CustomFunctionMap =
35    HashMap<String, Box<dyn Fn(&[Value], &Facts) -> Result<Value> + Send + Sync>>;
36
37/// Rule execution context for parallel processing
38#[derive(Debug, Clone)]
39pub struct RuleExecutionContext {
40    /// The rule that was executed
41    pub rule: Rule,
42    /// Whether the rule fired successfully
43    pub fired: bool,
44    /// Error message if execution failed
45    pub error: Option<String>,
46    /// Time taken to execute this rule
47    pub execution_time: Duration,
48}
49
50/// Parallel rule execution engine
51pub struct ParallelRuleEngine {
52    config: ParallelConfig,
53    custom_functions: Arc<RwLock<CustomFunctionMap>>,
54}
55
56impl ParallelRuleEngine {
57    /// Create new parallel rule engine
58    pub fn new(config: ParallelConfig) -> Self {
59        Self {
60            config,
61            custom_functions: Arc::new(RwLock::new(HashMap::new())),
62        }
63    }
64
65    /// Register a custom function
66    pub fn register_function<F>(&mut self, name: &str, func: F)
67    where
68        F: Fn(&[Value], &Facts) -> Result<Value> + Send + Sync + 'static,
69    {
70        let mut functions = self.custom_functions.write().unwrap();
71        functions.insert(name.to_string(), Box::new(func));
72    }
73
74    /// Execute rules with parallel processing
75    pub fn execute_parallel(
76        &self,
77        knowledge_base: &KnowledgeBase,
78        facts: &Facts,
79        debug_mode: bool,
80    ) -> Result<ParallelExecutionResult> {
81        let start_time = Instant::now();
82
83        if debug_mode {
84            println!(
85                "๐Ÿš€ Starting parallel rule execution with {} rules",
86                knowledge_base.get_rules().len()
87            );
88        }
89
90        // Group rules by salience for ordered execution
91        let salience_groups = self.group_rules_by_salience(&knowledge_base.get_rules());
92
93        let mut total_fired = 0;
94        let mut total_evaluated = 0;
95        let mut execution_contexts = Vec::new();
96
97        // Execute rules by salience level (highest first)
98        let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
99        salience_levels.sort_by(|a, b| b.cmp(a)); // Descending order
100
101        for salience in salience_levels {
102            let rules_at_level = &salience_groups[&salience];
103
104            if debug_mode {
105                println!(
106                    "โšก Processing {} rules at salience level {}",
107                    rules_at_level.len(),
108                    salience
109                );
110            }
111
112            // Decide whether to use parallel execution for this level
113            let should_parallelize = self.should_parallelize(rules_at_level);
114
115            let contexts = if should_parallelize {
116                self.execute_rules_parallel(rules_at_level, facts, debug_mode)?
117            } else {
118                self.execute_rules_sequential(rules_at_level, facts, debug_mode)?
119            };
120
121            // Count results
122            for context in &contexts {
123                total_evaluated += 1;
124                if context.fired {
125                    total_fired += 1;
126                }
127            }
128
129            execution_contexts.extend(contexts);
130        }
131
132        Ok(ParallelExecutionResult {
133            total_rules_evaluated: total_evaluated,
134            total_rules_fired: total_fired,
135            execution_time: start_time.elapsed(),
136            parallel_speedup: self.calculate_speedup(&execution_contexts),
137            execution_contexts,
138        })
139    }
140
141    /// Group rules by their salience level
142    fn group_rules_by_salience(&self, rules: &[Rule]) -> HashMap<i32, Vec<Rule>> {
143        let mut groups = HashMap::new();
144        for rule in rules {
145            if rule.enabled {
146                groups
147                    .entry(rule.salience)
148                    .or_insert_with(Vec::new)
149                    .push(rule.clone());
150            }
151        }
152        groups
153    }
154
155    /// Determine if rules should be executed in parallel
156    fn should_parallelize(&self, rules: &[Rule]) -> bool {
157        self.config.enabled && rules.len() >= self.config.min_rules_per_thread && rules.len() >= 2
158    }
159
160    /// Execute rules in parallel within the same salience level
161    fn execute_rules_parallel(
162        &self,
163        rules: &[Rule],
164        facts: &Facts,
165        debug_mode: bool,
166    ) -> Result<Vec<RuleExecutionContext>> {
167        let results = Arc::new(Mutex::new(Vec::new()));
168        let facts_arc = Arc::new(facts.clone());
169        let functions_arc = Arc::clone(&self.custom_functions);
170
171        // Create worker threads
172        let chunk_size = rules.len().div_ceil(self.config.max_threads);
173        let chunks: Vec<_> = rules.chunks(chunk_size).collect();
174
175        let handles: Vec<_> = chunks
176            .into_iter()
177            .enumerate()
178            .map(|(thread_id, chunk)| {
179                let chunk = chunk.to_vec();
180                let results_clone = Arc::clone(&results);
181                let facts_clone = Arc::clone(&facts_arc);
182                let functions_clone = Arc::clone(&functions_arc);
183
184                thread::spawn(move || {
185                    if debug_mode {
186                        println!("  ๐Ÿงต Thread {} processing {} rules", thread_id, chunk.len());
187                    }
188
189                    let mut thread_results = Vec::new();
190                    for rule in chunk {
191                        let start = Instant::now();
192                        let fired = Self::evaluate_rule_conditions(&rule, &facts_clone);
193
194                        if fired {
195                            if debug_mode {
196                                println!("    ๐Ÿ”ฅ Rule '{}' fired", rule.name);
197                            }
198
199                            // Execute actions (simplified for demo)
200                            for action in &rule.actions {
201                                if let Err(e) = Self::execute_action_parallel(
202                                    action,
203                                    &facts_clone,
204                                    &functions_clone,
205                                ) {
206                                    if debug_mode {
207                                        println!("    โŒ Action failed: {}", e);
208                                    }
209                                }
210                            }
211                        }
212
213                        thread_results.push(RuleExecutionContext {
214                            rule: rule.clone(),
215                            fired,
216                            error: None,
217                            execution_time: start.elapsed(),
218                        });
219                    }
220
221                    let mut results = results_clone.lock().unwrap();
222                    results.extend(thread_results);
223                })
224            })
225            .collect();
226
227        // Wait for all threads to complete
228        for handle in handles {
229            handle
230                .join()
231                .map_err(|_| RuleEngineError::EvaluationError {
232                    message: "Thread panicked during parallel execution".to_string(),
233                })?;
234        }
235
236        let results = results.lock().unwrap();
237        Ok(results.clone())
238    }
239
240    /// Execute rules sequentially (fallback)
241    fn execute_rules_sequential(
242        &self,
243        rules: &[Rule],
244        facts: &Facts,
245        debug_mode: bool,
246    ) -> Result<Vec<RuleExecutionContext>> {
247        let mut contexts = Vec::new();
248        let functions_arc = Arc::clone(&self.custom_functions);
249
250        for rule in rules {
251            let start = Instant::now();
252            let fired = Self::evaluate_rule_conditions(rule, facts);
253
254            if fired && debug_mode {
255                println!("    ๐Ÿ”ฅ Rule '{}' fired", rule.name);
256            }
257
258            if fired {
259                // Execute actions
260                for action in &rule.actions {
261                    if let Err(e) = Self::execute_action_parallel(action, facts, &functions_arc) {
262                        if debug_mode {
263                            println!("    โŒ Action failed: {}", e);
264                        }
265                    }
266                }
267            }
268
269            contexts.push(RuleExecutionContext {
270                rule: rule.clone(),
271                fired,
272                error: None,
273                execution_time: start.elapsed(),
274            });
275        }
276
277        Ok(contexts)
278    }
279
280    /// Simplified rule condition evaluation
281    /// TODO: This is a simplified version for parallel demo purposes
282    /// Real implementation should use proper condition evaluation like main engine
283    fn evaluate_rule_conditions(rule: &Rule, _facts: &Facts) -> bool {
284        // For demo purposes, just return true if rule has conditions
285        // In real implementation, this would evaluate the actual conditions
286        // using the same logic as engine.rs evaluate_conditions() method
287        !rule.actions.is_empty()
288    }
289
290    /// Execute action with parallel-safe function calls
291    fn execute_action_parallel(
292        action: &ActionType,
293        facts: &Facts,
294        functions: &Arc<RwLock<CustomFunctionMap>>,
295    ) -> Result<()> {
296        match action {
297            ActionType::Call { function, args } => {
298                let functions_guard = functions.read().unwrap();
299                if let Some(func) = functions_guard.get(function) {
300                    let _result = func(args, facts)?;
301                }
302                Ok(())
303            }
304            ActionType::MethodCall { .. } => {
305                // Simplified method call handling
306                Ok(())
307            }
308            ActionType::Set { .. } => {
309                // Simplified assignment handling
310                Ok(())
311            }
312            ActionType::Log { message } => {
313                println!("     ๐Ÿ“‹ {}", message);
314                Ok(())
315            }
316            ActionType::Update { .. } => {
317                // Simplified update handling
318                Ok(())
319            }
320            ActionType::Custom { .. } => {
321                // Simplified custom action handling
322                Ok(())
323            }
324            ActionType::ActivateAgendaGroup { .. } => {
325                // Workflow actions not supported in parallel execution
326                Ok(())
327            }
328            ActionType::ScheduleRule { .. } => {
329                // Workflow actions not supported in parallel execution
330                Ok(())
331            }
332            ActionType::CompleteWorkflow { .. } => {
333                // Workflow actions not supported in parallel execution
334                Ok(())
335            }
336            ActionType::SetWorkflowData { .. } => {
337                // Workflow actions not supported in parallel execution
338                Ok(())
339            }
340        }
341    }
342
343    /// Calculate parallel speedup
344    fn calculate_speedup(&self, contexts: &[RuleExecutionContext]) -> f64 {
345        if contexts.is_empty() {
346            return 1.0;
347        }
348
349        let total_time: Duration = contexts.iter().map(|c| c.execution_time).sum();
350        let max_time = contexts
351            .iter()
352            .map(|c| c.execution_time)
353            .max()
354            .unwrap_or(Duration::ZERO);
355
356        if max_time.as_nanos() > 0 {
357            total_time.as_nanos() as f64 / max_time.as_nanos() as f64
358        } else {
359            1.0
360        }
361    }
362}
363
364/// Result of parallel rule execution
365#[derive(Debug)]
366pub struct ParallelExecutionResult {
367    /// Total number of rules evaluated
368    pub total_rules_evaluated: usize,
369    /// Total number of rules that fired
370    pub total_rules_fired: usize,
371    /// Total execution time
372    pub execution_time: Duration,
373    /// Detailed execution contexts for each rule
374    pub execution_contexts: Vec<RuleExecutionContext>,
375    /// Parallel speedup factor
376    pub parallel_speedup: f64,
377}
378
379impl ParallelExecutionResult {
380    /// Get execution statistics
381    pub fn get_stats(&self) -> String {
382        format!(
383            "๐Ÿ“Š Parallel Execution Stats:\n   Rules evaluated: {}\n   Rules fired: {}\n   Execution time: {:?}\n   Parallel speedup: {:.2}x",
384            self.total_rules_evaluated,
385            self.total_rules_fired,
386            self.execution_time,
387            self.parallel_speedup
388        )
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::engine::rule::{Condition, ConditionGroup};
396    use crate::types::{Operator, Value};
397
398    #[test]
399    fn test_parallel_config_default() {
400        let config = ParallelConfig::default();
401        assert!(config.enabled);
402        assert!(config.max_threads > 0);
403        assert_eq!(config.min_rules_per_thread, 2);
404    }
405
406    #[test]
407    fn test_parallel_engine_creation() {
408        let config = ParallelConfig::default();
409        let engine = ParallelRuleEngine::new(config);
410        assert!(engine.custom_functions.read().unwrap().is_empty());
411    }
412
413    #[test]
414    fn test_salience_grouping() {
415        let config = ParallelConfig::default();
416        let engine = ParallelRuleEngine::new(config);
417
418        let rules = vec![
419            Rule::new(
420                "Rule1".to_string(),
421                ConditionGroup::Single(Condition::new(
422                    "test".to_string(),
423                    Operator::Equal,
424                    Value::Boolean(true),
425                )),
426                vec![],
427            )
428            .with_priority(10),
429            Rule::new(
430                "Rule2".to_string(),
431                ConditionGroup::Single(Condition::new(
432                    "test".to_string(),
433                    Operator::Equal,
434                    Value::Boolean(true),
435                )),
436                vec![],
437            )
438            .with_priority(10),
439            Rule::new(
440                "Rule3".to_string(),
441                ConditionGroup::Single(Condition::new(
442                    "test".to_string(),
443                    Operator::Equal,
444                    Value::Boolean(true),
445                )),
446                vec![],
447            )
448            .with_priority(5),
449        ];
450
451        let groups = engine.group_rules_by_salience(&rules);
452        assert_eq!(groups.len(), 2);
453        assert_eq!(groups[&10].len(), 2);
454        assert_eq!(groups[&5].len(), 1);
455    }
456}