dataflow_rs/engine/
mod.rs

1pub mod message;
2pub mod task;
3pub mod workflow;
4
5// Re-export key types for easier access
6pub use message::Message;
7pub use task::{FunctionHandler, Task};
8pub use workflow::Workflow;
9
10use message::AuditTrail;
11
12use chrono::Utc;
13use datalogic_rs::{DataLogic, DataValue, FromJson, Logic};
14use std::collections::HashMap;
15// Main engine that processes messages through workflows
16pub struct Engine<'a> {
17    functions: HashMap<String, Box<dyn FunctionHandler>>,
18    workflows: Vec<(Logic<'a>, Workflow)>,
19    data_logic: &'a DataLogic,
20}
21
22impl<'a> Engine<'a> {
23    pub fn new(data_logic: &'a DataLogic) -> Self {
24        Self {
25            functions: HashMap::new(),
26            workflows: Vec::new(),
27            data_logic,
28        }
29    }
30
31    pub fn add_workflow(&mut self, workflow: &Workflow) {
32        let condition = workflow.condition.clone();
33        let condition_logic = self
34            .data_logic
35            .parse_logic_json(&condition.unwrap(), None)
36            .unwrap();
37        self.workflows.push((condition_logic, workflow.clone()));
38    }
39
40    pub fn register_function(&mut self, id: String, handler: Box<dyn FunctionHandler>) {
41        self.functions.insert(id, handler);
42    }
43
44    pub fn process_message<'m>(&'m self, message: &mut Message<'m>)
45    where
46        'a: 'm,
47    {
48        for (condition_logic, workflow) in &self.workflows {
49            let result = self.data_logic.evaluate(condition_logic, &message.metadata);
50            match result {
51                Ok(result) => {
52                    if let Some(result) = result.as_bool() {
53                        if result {
54                            for (condition_logic, task) in &workflow.task_logics {
55                                let task_result =
56                                    self.data_logic.evaluate(condition_logic, &message.metadata);
57                                match task_result {
58                                    Ok(result) => {
59                                        if let Some(result) = result.as_bool() {
60                                            if result {
61                                                self.execute_task(message, task);
62                                            }
63                                        }
64                                    }
65                                    Err(e) => {
66                                        println!("Error evaluating task: {}", e);
67                                    }
68                                }
69                            }
70                        }
71                    }
72                }
73                Err(e) => {
74                    println!("Error evaluating condition: {}", e);
75                }
76            }
77        }
78    }
79
80    fn execute_task<'m>(&'m self, message: &mut Message<'m>, task: &Task)
81    where
82        'a: 'm,
83    {
84        if let Some(function) = self.functions.get(&task.function) {
85            let arena = self.data_logic.arena();
86            let input_value = DataValue::from_json(&task.input, arena);
87
88            match function.execute(message, &input_value, arena) {
89                Ok(changes) => {
90                    let audit_trail = AuditTrail {
91                        workflow_id: message.id.clone(),
92                        task_id: task.id.clone(),
93                        timestamp: Utc::now(),
94                        changes,
95                    };
96
97                    message.audit_trail.push(audit_trail);
98                }
99                Err(e) => {
100                    println!("Error executing task: {}", e);
101                }
102            }
103        } else {
104            println!("Function not found: {}", task.function);
105        }
106    }
107}