rust_logic_graph/core/
executor.rs1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4
5use crate::core::{Graph, GraphDef};
6use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
7use crate::rule::Rule;
8
9pub struct Executor {
10 nodes: HashMap<String, Box<dyn Node>>,
11}
12
13impl Executor {
14 pub fn new() -> Self {
15 Self {
16 nodes: HashMap::new(),
17 }
18 }
19
20 pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
22 let mut executor = Self::new();
23
24 for (node_id, node_type) in &def.nodes {
26 let node: Box<dyn Node> = match node_type {
27 crate::node::NodeType::RuleNode => {
28 Box::new(ConcreteRuleNode::new(node_id, "true"))
29 }
30 crate::node::NodeType::DBNode => {
31 Box::new(DBNode::new(node_id, format!("SELECT * FROM {}", node_id)))
32 }
33 crate::node::NodeType::AINode => {
34 Box::new(AINode::new(node_id, format!("Process data for {}", node_id)))
35 }
36 };
37
38 executor.register_node(node);
39 }
40
41 Ok(executor)
42 }
43
44 pub fn register_node(&mut self, node: Box<dyn Node>) {
46 let id = node.id().to_string();
47 self.nodes.insert(id, node);
48 }
49
50 pub async fn execute(&self, graph: &mut Graph) -> Result<()> {
52 info!("Executor: Starting graph execution");
53
54 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
56 let mut in_degree: HashMap<String, usize> = HashMap::new();
57
58 for node_id in graph.def.nodes.keys() {
60 in_degree.insert(node_id.clone(), 0);
61 adj_list.insert(node_id.clone(), Vec::new());
62 }
63
64 for edge in &graph.def.edges {
66 adj_list
67 .entry(edge.from.clone())
68 .or_insert_with(Vec::new)
69 .push(edge.to.clone());
70
71 *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
72 }
73
74 let mut queue: VecDeque<String> = in_degree
76 .iter()
77 .filter(|(_, °ree)| degree == 0)
78 .map(|(id, _)| id.clone())
79 .collect();
80
81 if queue.is_empty() {
82 warn!("No starting nodes found (all nodes have incoming edges). Starting with first node.");
83 if let Some(first_node) = graph.def.nodes.keys().next() {
84 queue.push_back(first_node.clone());
85 }
86 }
87
88 let mut executed = HashSet::new();
89 let mut execution_order = Vec::new();
90
91 while let Some(node_id) = queue.pop_front() {
93 if executed.contains(&node_id) {
94 continue;
95 }
96
97 info!("Executor: Processing node '{}'", node_id);
98
99 let incoming_edges: Vec<_> = graph
101 .def
102 .edges
103 .iter()
104 .filter(|e| e.to == node_id)
105 .collect();
106
107 let mut should_execute = true;
108
109 for edge in &incoming_edges {
110 if let Some(rule_id) = &edge.rule {
111 let rule = Rule::new(rule_id, "true"); match rule.evaluate(&graph.context.data) {
114 Ok(result) => {
115 debug!(
116 "Rule '{}' for edge {} -> {} evaluated to: {:?}",
117 rule_id, edge.from, edge.to, result
118 );
119
120 if let serde_json::Value::Bool(false) = result {
121 should_execute = false;
122 info!(
123 "Skipping node '{}' due to failed rule '{}'",
124 node_id, rule_id
125 );
126 break;
127 }
128 }
129 Err(e) => {
130 warn!(
131 "Rule '{}' evaluation failed: {}. Assuming true.",
132 rule_id, e
133 );
134 }
135 }
136 }
137 }
138
139 if should_execute {
141 if let Some(node) = self.nodes.get(&node_id) {
142 match node.run(&mut graph.context).await {
143 Ok(_) => {
144 info!("Node '{}' executed successfully", node_id);
145 execution_order.push(node_id.clone());
146 }
147 Err(e) => {
148 warn!("Node '{}' execution failed: {:?}", node_id, e);
149 }
150 }
151 } else {
152 warn!("Node '{}' not found in executor", node_id);
153 }
154 }
155
156 executed.insert(node_id.clone());
157
158 if let Some(neighbors) = adj_list.get(&node_id) {
160 for neighbor in neighbors {
161 if let Some(degree) = in_degree.get_mut(neighbor) {
162 *degree = degree.saturating_sub(1);
163 if *degree == 0 && !executed.contains(neighbor) {
164 queue.push_back(neighbor.clone());
165 }
166 }
167 }
168 }
169 }
170
171 info!(
172 "Executor: Completed execution. Executed nodes: {:?}",
173 execution_order
174 );
175
176 let unexecuted: Vec<_> = graph
178 .def
179 .nodes
180 .keys()
181 .filter(|id| !executed.contains(*id))
182 .collect();
183
184 if !unexecuted.is_empty() {
185 warn!("Some nodes were not executed (possible cycle): {:?}", unexecuted);
186 }
187
188 Ok(())
189 }
190}
191
192impl Default for Executor {
193 fn default() -> Self {
194 Self::new()
195 }
196}