1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4use std::time::{Duration, Instant};
5
6use crate::core::{Graph, GraphDef};
7use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
8use crate::rule::Rule;
9use crate::cache::{CacheManager, CacheKey};
10
11#[derive(Debug, Clone)]
13pub struct NodeExecutionStats {
14 pub node_id: String,
15 pub duration: Duration,
16 pub cache_hit: bool,
17 pub success: bool,
18}
19
20#[derive(Debug, Clone, Default)]
22pub struct ExecutionMetrics {
23 pub total_duration: Duration,
24 pub nodes_executed: usize,
25 pub nodes_skipped: usize,
26 pub nodes_failed: usize,
27 pub cache_hits: usize,
28 pub node_stats: Vec<NodeExecutionStats>,
29}
30
31pub struct Executor {
52 nodes: HashMap<String, Box<dyn Node>>,
53 cache: Option<CacheManager>,
54 metrics: ExecutionMetrics,
55}
56
57impl Executor {
58 pub fn new() -> Self {
59 Self {
60 nodes: HashMap::new(),
61 cache: None,
62 metrics: ExecutionMetrics::default(),
63 }
64 }
65
66 pub fn with_cache(cache: CacheManager) -> Self {
68 Self {
69 nodes: HashMap::new(),
70 cache: Some(cache),
71 metrics: ExecutionMetrics::default(),
72 }
73 }
74
75 pub fn set_cache(&mut self, cache: CacheManager) {
77 self.cache = Some(cache);
78 }
79
80 pub fn cache(&self) -> Option<&CacheManager> {
82 self.cache.as_ref()
83 }
84
85 pub fn metrics(&self) -> &ExecutionMetrics {
87 &self.metrics
88 }
89
90 pub fn reset_metrics(&mut self) {
92 self.metrics = ExecutionMetrics::default();
93 }
94
95 pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
97 let mut executor = Self::new();
98
99 for (node_id, config) in &def.nodes {
101 let node: Box<dyn Node> = match config.node_type {
102 crate::node::NodeType::RuleNode => {
103 let condition = config.condition.as_deref().unwrap_or("true");
104 Box::new(ConcreteRuleNode::new(node_id, condition))
105 }
106 crate::node::NodeType::DBNode => {
107 let query = config.query.clone()
108 .unwrap_or_else(|| format!("SELECT * FROM {}", node_id));
109
110 if let Some(params) = config.params.clone() {
112 Box::new(DBNode::with_params(node_id, query, params))
113 } else {
114 Box::new(DBNode::new(node_id, query))
115 }
116 }
117 crate::node::NodeType::AINode => {
118 let prompt = config.prompt.clone()
119 .unwrap_or_else(|| format!("Process data for {}", node_id));
120 Box::new(AINode::new(node_id, prompt))
121 }
122 crate::node::NodeType::GrpcNode => {
123 let query = config.query.clone()
125 .unwrap_or_else(|| format!("http://localhost:50051#{}_method", node_id));
126 let parts: Vec<&str> = query.split('#').collect();
127 let service_url = parts.get(0).unwrap_or(&"http://localhost:50051").to_string();
128 let method = parts.get(1).unwrap_or(&"UnknownMethod").to_string();
129 Box::new(crate::node::GrpcNode::new(node_id, service_url, method))
130 }
131 crate::node::NodeType::SubgraphNode |
134 crate::node::NodeType::ConditionalNode |
135 crate::node::NodeType::LoopNode |
136 crate::node::NodeType::TryCatchNode |
137 crate::node::NodeType::RetryNode |
138 crate::node::NodeType::CircuitBreakerNode => {
139 Box::new(ConcreteRuleNode::new(node_id, "true"))
142 }
143 };
144
145 executor.register_node(node);
146 }
147
148 Ok(executor)
149 }
150
151 pub fn register_node(&mut self, node: Box<dyn Node>) {
153 let id = node.id().to_string();
154 self.nodes.insert(id, node);
155 }
156
157 fn detect_cycles(&self, graph: &Graph) -> Result<()> {
159 let mut visited = HashSet::new();
160 let mut rec_stack = HashSet::new();
161
162 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
164 for edge in &graph.def.edges {
165 adj_list
166 .entry(edge.from.clone())
167 .or_insert_with(Vec::new)
168 .push(edge.to.clone());
169 }
170
171 fn dfs_cycle_check(
173 node: &str,
174 adj_list: &HashMap<String, Vec<String>>,
175 visited: &mut HashSet<String>,
176 rec_stack: &mut HashSet<String>,
177 path: &mut Vec<String>,
178 ) -> Option<Vec<String>> {
179 visited.insert(node.to_string());
180 rec_stack.insert(node.to_string());
181 path.push(node.to_string());
182
183 if let Some(neighbors) = adj_list.get(node) {
184 for neighbor in neighbors {
185 if !visited.contains(neighbor) {
186 if let Some(cycle) = dfs_cycle_check(neighbor, adj_list, visited, rec_stack, path) {
187 return Some(cycle);
188 }
189 } else if rec_stack.contains(neighbor) {
190 let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
192 return Some(path[cycle_start..].to_vec());
193 }
194 }
195 }
196
197 path.pop();
198 rec_stack.remove(node);
199 None
200 }
201
202 for node_id in graph.def.nodes.keys() {
204 if !visited.contains(node_id) {
205 let mut path = Vec::new();
206 if let Some(cycle) = dfs_cycle_check(node_id, &adj_list, &mut visited, &mut rec_stack, &mut path) {
207 return Err(anyhow::anyhow!(
208 "Cycle detected in graph: {} -> {}",
209 cycle.join(" -> "),
210 cycle.first().unwrap()
211 ));
212 }
213 }
214 }
215
216 Ok(())
217 }
218
219 pub async fn execute(&mut self, graph: &mut Graph) -> Result<()> {
221 eprintln!("\n⚡⚡⚡ CORE EXECUTOR.EXECUTE called ⚡⚡⚡");
222 eprintln!("Graph has {} nodes, {} edges", graph.def.nodes.len(), graph.def.edges.len());
223 info!("Executor: Starting graph execution");
224 let execution_start = Instant::now();
225
226 self.metrics = ExecutionMetrics::default();
228
229 graph.def.validate()?;
231
232 if graph.def.has_disconnected_components() {
234 warn!("Graph has disconnected components - some nodes may not be reachable");
235 }
236
237 self.detect_cycles(graph)?;
239
240 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
242 let mut in_degree: HashMap<String, usize> = HashMap::new();
243
244 for node_id in graph.def.nodes.keys() {
246 in_degree.insert(node_id.clone(), 0);
247 adj_list.insert(node_id.clone(), Vec::new());
248 }
249
250 for edge in &graph.def.edges {
252 adj_list
253 .entry(edge.from.clone())
254 .or_insert_with(Vec::new)
255 .push(edge.to.clone());
256
257 *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
258 }
259
260 let mut queue: VecDeque<String> = in_degree
262 .iter()
263 .filter(|(_, °ree)| degree == 0)
264 .map(|(id, _)| id.clone())
265 .collect();
266
267 if queue.is_empty() {
268 return Err(anyhow::anyhow!(
269 "No starting nodes found in graph (all nodes have incoming edges). \
270 This indicates either a cycle (which should have been caught earlier) \
271 or an invalid graph structure. Nodes: {:?}",
272 graph.def.nodes.keys().collect::<Vec<_>>()
273 ));
274 }
275
276 let mut executed = HashSet::new();
277 let mut execution_order = Vec::new();
278
279 while let Some(node_id) = queue.pop_front() {
281 if executed.contains(&node_id) {
282 continue;
283 }
284
285 info!("Executor: Processing node '{}'", node_id);
286
287 let incoming_edges: Vec<_> = graph
290 .def
291 .edges
292 .iter()
293 .filter(|e| e.to == node_id && executed.contains(&e.from))
294 .collect();
295
296 let mut should_execute = true;
297
298 for edge in &incoming_edges {
299 if let Some(rule_id) = &edge.rule {
300 let rule = Rule::new(rule_id, "true"); match rule.evaluate(&graph.context.data) {
303 Ok(result) => {
304 debug!(
305 "Rule '{}' for edge {} -> {} evaluated to: {:?}",
306 rule_id, edge.from, edge.to, result
307 );
308
309 if let serde_json::Value::Bool(false) = result {
310 should_execute = false;
311 info!(
312 "Skipping node '{}' due to failed rule '{}' from executed node '{}'",
313 node_id, rule_id, edge.from
314 );
315 self.metrics.nodes_skipped += 1;
316 break;
317 }
318 }
319 Err(e) => {
320 warn!(
321 "Rule '{}' evaluation failed: {}. Assuming true.",
322 rule_id, e
323 );
324 }
325 }
326 }
327 }
328
329 if should_execute {
331 eprintln!("🔵 About to execute node: {}", node_id);
332 if let Some(node) = self.nodes.get(&node_id) {
333 let node_start = Instant::now();
334 let mut cache_hit = false;
335
336 let mut relevant_context: HashMap<String, serde_json::Value> = incoming_edges
339 .iter()
340 .filter_map(|edge| {
341 graph.context.data.get(&format!("{}_result", edge.from))
342 .map(|v| (edge.from.clone(), v.clone()))
343 })
344 .collect();
345
346 if incoming_edges.is_empty() {
349 for (key, value) in &graph.context.data {
351 if !key.ends_with("_result") {
352 relevant_context.insert(format!("_initial_{}", key), value.clone());
353 }
354 }
355 }
356
357 let context_value = serde_json::to_value(&relevant_context)?;
358 let cache_key = CacheKey::new(&node_id, &context_value);
359
360 let cached_result = if let Some(cache) = &self.cache {
362 cache.get(&cache_key)
363 } else {
364 None
365 };
366
367 let result = if let Some(cached_value) = cached_result {
368 info!("Node '{}' result retrieved from cache", node_id);
369 cache_hit = true;
370 self.metrics.cache_hits += 1;
371
372 if let serde_json::Value::Object(cached_obj) = cached_value {
374 for (k, v) in cached_obj {
375 graph.context.data.insert(k, v);
376 }
377 }
378
379 Ok(serde_json::Value::Null) } else {
381 let exec_result = node.run(&mut graph.context).await;
383
384 if exec_result.is_ok() {
386 if let Some(cache) = &self.cache {
387 let mut result_only: HashMap<String, serde_json::Value> = HashMap::new();
390 for (key, value) in &graph.context.data {
391 if key.ends_with("_result") {
392 result_only.insert(key.clone(), value.clone());
393 }
394 }
395 let context_result = serde_json::to_value(&result_only)?;
396 if let Err(e) = cache.put(cache_key, context_result, None) {
397 warn!("Failed to cache result for node '{}': {}", node_id, e);
398 }
399 }
400 }
401
402 exec_result
403 };
404
405 match result {
406 Ok(_) => {
407 let duration = node_start.elapsed();
408 info!("Node '{}' executed successfully in {:?}", node_id, duration);
409 execution_order.push(node_id.clone());
410
411 self.metrics.nodes_executed += 1;
412 self.metrics.node_stats.push(NodeExecutionStats {
413 node_id: node_id.clone(),
414 duration,
415 cache_hit,
416 success: true,
417 });
418 }
419 Err(e) => {
420 let duration = node_start.elapsed();
421 warn!("Node '{}' execution failed: {:?}", node_id, e);
422
423 self.metrics.nodes_failed += 1;
424 self.metrics.node_stats.push(NodeExecutionStats {
425 node_id: node_id.clone(),
426 duration,
427 cache_hit,
428 success: false,
429 });
430 }
431 }
432 } else {
433 warn!("Node '{}' not found in executor", node_id);
434 }
435 }
436
437 executed.insert(node_id.clone());
438
439 if let Some(neighbors) = adj_list.get(&node_id) {
441 for neighbor in neighbors {
442 if let Some(degree) = in_degree.get_mut(neighbor) {
443 *degree = degree.saturating_sub(1);
444 if *degree == 0 && !executed.contains(neighbor) {
445 queue.push_back(neighbor.clone());
446 }
447 }
448 }
449 }
450 }
451
452 self.metrics.total_duration = execution_start.elapsed();
453
454 info!(
455 "Executor: Completed execution in {:?}. Executed: {}, Skipped: {}, Failed: {}, Cache hits: {}",
456 self.metrics.total_duration,
457 self.metrics.nodes_executed,
458 self.metrics.nodes_skipped,
459 self.metrics.nodes_failed,
460 self.metrics.cache_hits
461 );
462
463 let unexecuted: Vec<_> = graph
465 .def
466 .nodes
467 .keys()
468 .filter(|id| !executed.contains(*id))
469 .collect();
470
471 if !unexecuted.is_empty() {
472 return Err(anyhow::anyhow!(
473 "Some nodes were not executed: {:?}. This indicates a bug in the executor logic.",
474 unexecuted
475 ));
476 }
477
478 Ok(())
479 }
480}
481
482impl Default for Executor {
483 fn default() -> Self {
484 Self::new()
485 }
486}