dataflow_rs/engine/
mod.rs1pub mod message;
2pub mod task;
3pub mod workflow;
4
5pub use message::Message;
7pub use task::{FunctionHandler, Task};
8pub use workflow::Workflow;
9
10use message::AuditTrail;
11
12use chrono::Utc;
13use datalogic_rs::{DataLogic, DataValue, FromJson, Logic};
14use std::collections::HashMap;
15pub struct Engine<'a> {
17 functions: HashMap<String, Box<dyn FunctionHandler>>,
18 workflows: Vec<(Logic<'a>, Workflow)>,
19 data_logic: &'a DataLogic,
20}
21
22impl<'a> Engine<'a> {
23 pub fn new(data_logic: &'a DataLogic) -> Self {
24 Self {
25 functions: HashMap::new(),
26 workflows: Vec::new(),
27 data_logic,
28 }
29 }
30
31 pub fn add_workflow(&mut self, workflow: &Workflow) {
32 let condition = workflow.condition.clone();
33 let condition_logic = self
34 .data_logic
35 .parse_logic_json(&condition.unwrap(), None)
36 .unwrap();
37 self.workflows.push((condition_logic, workflow.clone()));
38 }
39
40 pub fn register_function(&mut self, id: String, handler: Box<dyn FunctionHandler>) {
41 self.functions.insert(id, handler);
42 }
43
44 pub fn process_message<'m>(&'m self, message: &mut Message<'m>)
45 where
46 'a: 'm,
47 {
48 for (condition_logic, workflow) in &self.workflows {
49 let result = self.data_logic.evaluate(condition_logic, &message.metadata);
50 match result {
51 Ok(result) => {
52 if let Some(result) = result.as_bool() {
53 if result {
54 for (condition_logic, task) in &workflow.task_logics {
55 let task_result =
56 self.data_logic.evaluate(condition_logic, &message.metadata);
57 match task_result {
58 Ok(result) => {
59 if let Some(result) = result.as_bool() {
60 if result {
61 self.execute_task(message, task);
62 }
63 }
64 }
65 Err(e) => {
66 println!("Error evaluating task: {}", e);
67 }
68 }
69 }
70 }
71 }
72 }
73 Err(e) => {
74 println!("Error evaluating condition: {}", e);
75 }
76 }
77 }
78 }
79
80 fn execute_task<'m>(&'m self, message: &mut Message<'m>, task: &Task)
81 where
82 'a: 'm,
83 {
84 if let Some(function) = self.functions.get(&task.function) {
85 let arena = self.data_logic.arena();
86 let input_value = DataValue::from_json(&task.input, arena);
87
88 match function.execute(message, &input_value, arena) {
89 Ok(changes) => {
90 let audit_trail = AuditTrail {
91 workflow_id: message.id.clone(),
92 task_id: task.id.clone(),
93 timestamp: Utc::now(),
94 changes,
95 };
96
97 message.audit_trail.push(audit_trail);
98 }
99 Err(e) => {
100 println!("Error executing task: {}", e);
101 }
102 }
103 } else {
104 println!("Function not found: {}", task.function);
105 }
106 }
107}