rust_logic_graph/core/
executor.rs1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4
5use crate::core::{Graph, GraphDef};
6use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
7use crate::rule::Rule;
8use crate::cache::{CacheManager, CacheKey};
9
10pub struct Executor {
11 nodes: HashMap<String, Box<dyn Node>>,
12 cache: Option<CacheManager>,
13}
14
15impl Executor {
16 pub fn new() -> Self {
17 Self {
18 nodes: HashMap::new(),
19 cache: None,
20 }
21 }
22
23 pub fn with_cache(cache: CacheManager) -> Self {
25 Self {
26 nodes: HashMap::new(),
27 cache: Some(cache),
28 }
29 }
30
31 pub fn set_cache(&mut self, cache: CacheManager) {
33 self.cache = Some(cache);
34 }
35
36 pub fn cache(&self) -> Option<&CacheManager> {
38 self.cache.as_ref()
39 }
40
41 pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
43 let mut executor = Self::new();
44
45 for (node_id, node_type) in &def.nodes {
47 let node: Box<dyn Node> = match node_type {
48 crate::node::NodeType::RuleNode => {
49 Box::new(ConcreteRuleNode::new(node_id, "true"))
50 }
51 crate::node::NodeType::DBNode => {
52 Box::new(DBNode::new(node_id, format!("SELECT * FROM {}", node_id)))
53 }
54 crate::node::NodeType::AINode => {
55 Box::new(AINode::new(node_id, format!("Process data for {}", node_id)))
56 }
57 };
58
59 executor.register_node(node);
60 }
61
62 Ok(executor)
63 }
64
65 pub fn register_node(&mut self, node: Box<dyn Node>) {
67 let id = node.id().to_string();
68 self.nodes.insert(id, node);
69 }
70
71 pub async fn execute(&self, graph: &mut Graph) -> Result<()> {
73 info!("Executor: Starting graph execution");
74
75 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
77 let mut in_degree: HashMap<String, usize> = HashMap::new();
78
79 for node_id in graph.def.nodes.keys() {
81 in_degree.insert(node_id.clone(), 0);
82 adj_list.insert(node_id.clone(), Vec::new());
83 }
84
85 for edge in &graph.def.edges {
87 adj_list
88 .entry(edge.from.clone())
89 .or_insert_with(Vec::new)
90 .push(edge.to.clone());
91
92 *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
93 }
94
95 let mut queue: VecDeque<String> = in_degree
97 .iter()
98 .filter(|(_, °ree)| degree == 0)
99 .map(|(id, _)| id.clone())
100 .collect();
101
102 if queue.is_empty() {
103 warn!("No starting nodes found (all nodes have incoming edges). Starting with first node.");
104 if let Some(first_node) = graph.def.nodes.keys().next() {
105 queue.push_back(first_node.clone());
106 }
107 }
108
109 let mut executed = HashSet::new();
110 let mut execution_order = Vec::new();
111
112 while let Some(node_id) = queue.pop_front() {
114 if executed.contains(&node_id) {
115 continue;
116 }
117
118 info!("Executor: Processing node '{}'", node_id);
119
120 let incoming_edges: Vec<_> = graph
122 .def
123 .edges
124 .iter()
125 .filter(|e| e.to == node_id)
126 .collect();
127
128 let mut should_execute = true;
129
130 for edge in &incoming_edges {
131 if let Some(rule_id) = &edge.rule {
132 let rule = Rule::new(rule_id, "true"); match rule.evaluate(&graph.context.data) {
135 Ok(result) => {
136 debug!(
137 "Rule '{}' for edge {} -> {} evaluated to: {:?}",
138 rule_id, edge.from, edge.to, result
139 );
140
141 if let serde_json::Value::Bool(false) = result {
142 should_execute = false;
143 info!(
144 "Skipping node '{}' due to failed rule '{}'",
145 node_id, rule_id
146 );
147 break;
148 }
149 }
150 Err(e) => {
151 warn!(
152 "Rule '{}' evaluation failed: {}. Assuming true.",
153 rule_id, e
154 );
155 }
156 }
157 }
158 }
159
160 if should_execute {
162 if let Some(node) = self.nodes.get(&node_id) {
163 let context_value = serde_json::to_value(&graph.context.data)?;
165 let cache_key = CacheKey::new(&node_id, &context_value);
166
167 let mut cached_hit = false;
169
170 if let Some(cache) = &self.cache {
171 if let Some(cached_value) = cache.get(&cache_key) {
172 info!("Node '{}' result retrieved from cache", node_id);
173
174 if let serde_json::Value::Object(cached_obj) = cached_value {
176 for (k, v) in cached_obj {
177 graph.context.data.insert(k, v);
178 }
179 }
180
181 cached_hit = true;
182 }
183 }
184
185 let result = if !cached_hit {
187 node.run(&mut graph.context).await
188 } else {
189 Ok(serde_json::Value::Null) };
191
192 match result {
193 Ok(_) => {
194 info!("Node '{}' executed successfully", node_id);
195 execution_order.push(node_id.clone());
196
197 if !cached_hit {
199 if let Some(cache) = &self.cache {
200 let context_result = serde_json::to_value(&graph.context.data)?;
201 if let Err(e) = cache.put(cache_key, context_result, None) {
202 warn!("Failed to cache result for node '{}': {}", node_id, e);
203 }
204 }
205 }
206 }
207 Err(e) => {
208 warn!("Node '{}' execution failed: {:?}", node_id, e);
209 }
210 }
211 } else {
212 warn!("Node '{}' not found in executor", node_id);
213 }
214 }
215
216 executed.insert(node_id.clone());
217
218 if let Some(neighbors) = adj_list.get(&node_id) {
220 for neighbor in neighbors {
221 if let Some(degree) = in_degree.get_mut(neighbor) {
222 *degree = degree.saturating_sub(1);
223 if *degree == 0 && !executed.contains(neighbor) {
224 queue.push_back(neighbor.clone());
225 }
226 }
227 }
228 }
229 }
230
231 info!(
232 "Executor: Completed execution. Executed nodes: {:?}",
233 execution_order
234 );
235
236 let unexecuted: Vec<_> = graph
238 .def
239 .nodes
240 .keys()
241 .filter(|id| !executed.contains(*id))
242 .collect();
243
244 if !unexecuted.is_empty() {
245 warn!("Some nodes were not executed (possible cycle): {:?}", unexecuted);
246 }
247
248 Ok(())
249 }
250}
251
252impl Default for Executor {
253 fn default() -> Self {
254 Self::new()
255 }
256}