1use crate::engine::{facts::Facts, knowledge_base::KnowledgeBase, rule::Rule};
2use crate::errors::{Result, RuleEngineError};
3use crate::types::{ActionType, Value};
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::thread;
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12 pub enabled: bool,
14 pub max_threads: usize,
16 pub min_rules_per_thread: usize,
18 pub dependency_analysis: bool,
20}
21
22impl Default for ParallelConfig {
23 fn default() -> Self {
24 Self {
25 enabled: true,
26 max_threads: num_cpus::get(),
27 min_rules_per_thread: 2,
28 dependency_analysis: true,
29 }
30 }
31}
32
33type CustomFunctionMap =
35 HashMap<String, Box<dyn Fn(&[Value], &Facts) -> Result<Value> + Send + Sync>>;
36
37#[derive(Debug, Clone)]
39pub struct RuleExecutionContext {
40 pub rule: Rule,
42 pub fired: bool,
44 pub error: Option<String>,
46 pub execution_time: Duration,
48}
49
50pub struct ParallelRuleEngine {
52 config: ParallelConfig,
53 custom_functions: Arc<RwLock<CustomFunctionMap>>,
54}
55
56impl ParallelRuleEngine {
57 pub fn new(config: ParallelConfig) -> Self {
59 Self {
60 config,
61 custom_functions: Arc::new(RwLock::new(HashMap::new())),
62 }
63 }
64
65 pub fn register_function<F>(&mut self, name: &str, func: F)
67 where
68 F: Fn(&[Value], &Facts) -> Result<Value> + Send + Sync + 'static,
69 {
70 let mut functions = self.custom_functions.write().unwrap();
71 functions.insert(name.to_string(), Box::new(func));
72 }
73
74 pub fn execute_parallel(
76 &self,
77 knowledge_base: &KnowledgeBase,
78 facts: &Facts,
79 debug_mode: bool,
80 ) -> Result<ParallelExecutionResult> {
81 let start_time = Instant::now();
82
83 if debug_mode {
84 println!(
85 "๐ Starting parallel rule execution with {} rules",
86 knowledge_base.get_rules().len()
87 );
88 }
89
90 let salience_groups = self.group_rules_by_salience(&knowledge_base.get_rules());
92
93 let mut total_fired = 0;
94 let mut total_evaluated = 0;
95 let mut execution_contexts = Vec::new();
96
97 let mut salience_levels: Vec<_> = salience_groups.keys().copied().collect();
99 salience_levels.sort_by(|a, b| b.cmp(a)); for salience in salience_levels {
102 let rules_at_level = &salience_groups[&salience];
103
104 if debug_mode {
105 println!(
106 "โก Processing {} rules at salience level {}",
107 rules_at_level.len(),
108 salience
109 );
110 }
111
112 let should_parallelize = self.should_parallelize(rules_at_level);
114
115 let contexts = if should_parallelize {
116 self.execute_rules_parallel(rules_at_level, facts, debug_mode)?
117 } else {
118 self.execute_rules_sequential(rules_at_level, facts, debug_mode)?
119 };
120
121 for context in &contexts {
123 total_evaluated += 1;
124 if context.fired {
125 total_fired += 1;
126 }
127 }
128
129 execution_contexts.extend(contexts);
130 }
131
132 Ok(ParallelExecutionResult {
133 total_rules_evaluated: total_evaluated,
134 total_rules_fired: total_fired,
135 execution_time: start_time.elapsed(),
136 parallel_speedup: self.calculate_speedup(&execution_contexts),
137 execution_contexts,
138 })
139 }
140
141 fn group_rules_by_salience(&self, rules: &[Rule]) -> HashMap<i32, Vec<Rule>> {
143 let mut groups = HashMap::new();
144 for rule in rules {
145 if rule.enabled {
146 groups
147 .entry(rule.salience)
148 .or_insert_with(Vec::new)
149 .push(rule.clone());
150 }
151 }
152 groups
153 }
154
155 fn should_parallelize(&self, rules: &[Rule]) -> bool {
157 self.config.enabled && rules.len() >= self.config.min_rules_per_thread && rules.len() >= 2
158 }
159
160 fn execute_rules_parallel(
162 &self,
163 rules: &[Rule],
164 facts: &Facts,
165 debug_mode: bool,
166 ) -> Result<Vec<RuleExecutionContext>> {
167 let results = Arc::new(Mutex::new(Vec::new()));
168 let facts_arc = Arc::new(facts.clone());
169 let functions_arc = Arc::clone(&self.custom_functions);
170
171 let chunk_size = rules.len().div_ceil(self.config.max_threads);
173 let chunks: Vec<_> = rules.chunks(chunk_size).collect();
174
175 let handles: Vec<_> = chunks
176 .into_iter()
177 .enumerate()
178 .map(|(thread_id, chunk)| {
179 let chunk = chunk.to_vec();
180 let results_clone = Arc::clone(&results);
181 let facts_clone = Arc::clone(&facts_arc);
182 let functions_clone = Arc::clone(&functions_arc);
183
184 thread::spawn(move || {
185 if debug_mode {
186 println!(" ๐งต Thread {} processing {} rules", thread_id, chunk.len());
187 }
188
189 let mut thread_results = Vec::new();
190 for rule in chunk {
191 let start = Instant::now();
192 let fired = Self::evaluate_rule_conditions(&rule, &facts_clone);
193
194 if fired {
195 if debug_mode {
196 println!(" ๐ฅ Rule '{}' fired", rule.name);
197 }
198
199 for action in &rule.actions {
201 if let Err(e) = Self::execute_action_parallel(
202 action,
203 &facts_clone,
204 &functions_clone,
205 ) {
206 if debug_mode {
207 println!(" โ Action failed: {}", e);
208 }
209 }
210 }
211 }
212
213 thread_results.push(RuleExecutionContext {
214 rule: rule.clone(),
215 fired,
216 error: None,
217 execution_time: start.elapsed(),
218 });
219 }
220
221 let mut results = results_clone.lock().unwrap();
222 results.extend(thread_results);
223 })
224 })
225 .collect();
226
227 for handle in handles {
229 handle
230 .join()
231 .map_err(|_| RuleEngineError::EvaluationError {
232 message: "Thread panicked during parallel execution".to_string(),
233 })?;
234 }
235
236 let results = results.lock().unwrap();
237 Ok(results.clone())
238 }
239
240 fn execute_rules_sequential(
242 &self,
243 rules: &[Rule],
244 facts: &Facts,
245 debug_mode: bool,
246 ) -> Result<Vec<RuleExecutionContext>> {
247 let mut contexts = Vec::new();
248 let functions_arc = Arc::clone(&self.custom_functions);
249
250 for rule in rules {
251 let start = Instant::now();
252 let fired = Self::evaluate_rule_conditions(rule, facts);
253
254 if fired && debug_mode {
255 println!(" ๐ฅ Rule '{}' fired", rule.name);
256 }
257
258 if fired {
259 for action in &rule.actions {
261 if let Err(e) = Self::execute_action_parallel(action, facts, &functions_arc) {
262 if debug_mode {
263 println!(" โ Action failed: {}", e);
264 }
265 }
266 }
267 }
268
269 contexts.push(RuleExecutionContext {
270 rule: rule.clone(),
271 fired,
272 error: None,
273 execution_time: start.elapsed(),
274 });
275 }
276
277 Ok(contexts)
278 }
279
280 fn evaluate_rule_conditions(rule: &Rule, _facts: &Facts) -> bool {
284 !rule.actions.is_empty()
288 }
289
290 fn execute_action_parallel(
292 action: &ActionType,
293 facts: &Facts,
294 functions: &Arc<RwLock<CustomFunctionMap>>,
295 ) -> Result<()> {
296 match action {
297 ActionType::Call { function, args } => {
298 let functions_guard = functions.read().unwrap();
299 if let Some(func) = functions_guard.get(function) {
300 let _result = func(args, facts)?;
301 }
302 Ok(())
303 }
304 ActionType::MethodCall { .. } => {
305 Ok(())
307 }
308 ActionType::Set { .. } => {
309 Ok(())
311 }
312 ActionType::Log { message } => {
313 println!(" ๐ {}", message);
314 Ok(())
315 }
316 ActionType::Update { .. } => {
317 Ok(())
319 }
320 ActionType::Custom { .. } => {
321 Ok(())
323 }
324 ActionType::ActivateAgendaGroup { .. } => {
325 Ok(())
327 }
328 ActionType::ScheduleRule { .. } => {
329 Ok(())
331 }
332 ActionType::CompleteWorkflow { .. } => {
333 Ok(())
335 }
336 ActionType::SetWorkflowData { .. } => {
337 Ok(())
339 }
340 }
341 }
342
343 fn calculate_speedup(&self, contexts: &[RuleExecutionContext]) -> f64 {
345 if contexts.is_empty() {
346 return 1.0;
347 }
348
349 let total_time: Duration = contexts.iter().map(|c| c.execution_time).sum();
350 let max_time = contexts
351 .iter()
352 .map(|c| c.execution_time)
353 .max()
354 .unwrap_or(Duration::ZERO);
355
356 if max_time.as_nanos() > 0 {
357 total_time.as_nanos() as f64 / max_time.as_nanos() as f64
358 } else {
359 1.0
360 }
361 }
362}
363
364#[derive(Debug)]
366pub struct ParallelExecutionResult {
367 pub total_rules_evaluated: usize,
369 pub total_rules_fired: usize,
371 pub execution_time: Duration,
373 pub execution_contexts: Vec<RuleExecutionContext>,
375 pub parallel_speedup: f64,
377}
378
379impl ParallelExecutionResult {
380 pub fn get_stats(&self) -> String {
382 format!(
383 "๐ Parallel Execution Stats:\n Rules evaluated: {}\n Rules fired: {}\n Execution time: {:?}\n Parallel speedup: {:.2}x",
384 self.total_rules_evaluated,
385 self.total_rules_fired,
386 self.execution_time,
387 self.parallel_speedup
388 )
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::engine::rule::{Condition, ConditionGroup};
396 use crate::types::{Operator, Value};
397
398 #[test]
399 fn test_parallel_config_default() {
400 let config = ParallelConfig::default();
401 assert!(config.enabled);
402 assert!(config.max_threads > 0);
403 assert_eq!(config.min_rules_per_thread, 2);
404 }
405
406 #[test]
407 fn test_parallel_engine_creation() {
408 let config = ParallelConfig::default();
409 let engine = ParallelRuleEngine::new(config);
410 assert!(engine.custom_functions.read().unwrap().is_empty());
411 }
412
413 #[test]
414 fn test_salience_grouping() {
415 let config = ParallelConfig::default();
416 let engine = ParallelRuleEngine::new(config);
417
418 let rules = vec![
419 Rule::new(
420 "Rule1".to_string(),
421 ConditionGroup::Single(Condition::new(
422 "test".to_string(),
423 Operator::Equal,
424 Value::Boolean(true),
425 )),
426 vec![],
427 )
428 .with_priority(10),
429 Rule::new(
430 "Rule2".to_string(),
431 ConditionGroup::Single(Condition::new(
432 "test".to_string(),
433 Operator::Equal,
434 Value::Boolean(true),
435 )),
436 vec![],
437 )
438 .with_priority(10),
439 Rule::new(
440 "Rule3".to_string(),
441 ConditionGroup::Single(Condition::new(
442 "test".to_string(),
443 Operator::Equal,
444 Value::Boolean(true),
445 )),
446 vec![],
447 )
448 .with_priority(5),
449 ];
450
451 let groups = engine.group_rules_by_salience(&rules);
452 assert_eq!(groups.len(), 2);
453 assert_eq!(groups[&10].len(), 2);
454 assert_eq!(groups[&5].len(), 1);
455 }
456}