1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4use std::time::{Duration, Instant};
5
6use crate::core::{Graph, GraphDef};
7use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
8use crate::rule::Rule;
9use crate::cache::{CacheManager, CacheKey};
10
11#[derive(Debug, Clone)]
13pub struct NodeExecutionStats {
14 pub node_id: String,
15 pub duration: Duration,
16 pub cache_hit: bool,
17 pub success: bool,
18}
19
20#[derive(Debug, Clone, Default)]
22pub struct ExecutionMetrics {
23 pub total_duration: Duration,
24 pub nodes_executed: usize,
25 pub nodes_skipped: usize,
26 pub nodes_failed: usize,
27 pub cache_hits: usize,
28 pub node_stats: Vec<NodeExecutionStats>,
29}
30
31pub struct Executor {
52 nodes: HashMap<String, Box<dyn Node>>,
53 cache: Option<CacheManager>,
54 metrics: ExecutionMetrics,
55 fallback_handler: Option<crate::fault_tolerance::degradation::FallbackHandler>,
56}
57
58impl Executor {
59 pub fn new() -> Self {
60 Self {
61 nodes: HashMap::new(),
62 cache: None,
63 metrics: ExecutionMetrics::default(),
64 fallback_handler: None,
65 }
66 }
67
68 pub fn with_cache(cache: CacheManager) -> Self {
70 Self {
71 nodes: HashMap::new(),
72 cache: Some(cache),
73 metrics: ExecutionMetrics::default(),
74 fallback_handler: None,
75 }
76 }
77
78 pub fn set_cache(&mut self, cache: CacheManager) {
80 self.cache = Some(cache);
81 }
82
83 pub fn set_fallback_handler(&mut self, handler: crate::fault_tolerance::degradation::FallbackHandler) {
85 self.fallback_handler = Some(handler);
86 }
87
88 pub fn cache(&self) -> Option<&CacheManager> {
90 self.cache.as_ref()
91 }
92
93 pub fn metrics(&self) -> &ExecutionMetrics {
95 &self.metrics
96 }
97
98 pub fn reset_metrics(&mut self) {
100 self.metrics = ExecutionMetrics::default();
101 }
102
103 pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
105 let mut executor = Self::new();
106
107 for (node_id, config) in &def.nodes {
109 let node: Box<dyn Node> = match config.node_type {
110 crate::node::NodeType::RuleNode => {
111 let condition = config.condition.as_deref().unwrap_or("true");
112 Box::new(ConcreteRuleNode::new(node_id, condition))
113 }
114 crate::node::NodeType::DBNode => {
115 let query = config.query.clone()
116 .unwrap_or_else(|| format!("SELECT * FROM {}", node_id));
117
118 if let Some(params) = config.params.clone() {
120 Box::new(DBNode::with_params(node_id, query, params))
121 } else {
122 Box::new(DBNode::new(node_id, query))
123 }
124 }
125 crate::node::NodeType::AINode => {
126 let prompt = config.prompt.clone()
127 .unwrap_or_else(|| format!("Process data for {}", node_id));
128 Box::new(AINode::new(node_id, prompt))
129 }
130 crate::node::NodeType::GrpcNode => {
131 let query = config.query.clone()
133 .unwrap_or_else(|| format!("http://localhost:50051#{}_method", node_id));
134 let parts: Vec<&str> = query.split('#').collect();
135 let service_url = parts.get(0).unwrap_or(&"http://localhost:50051").to_string();
136 let method = parts.get(1).unwrap_or(&"UnknownMethod").to_string();
137 Box::new(crate::node::GrpcNode::new(node_id, service_url, method))
138 }
139 crate::node::NodeType::SubgraphNode |
142 crate::node::NodeType::ConditionalNode |
143 crate::node::NodeType::LoopNode |
144 crate::node::NodeType::TryCatchNode |
145 crate::node::NodeType::RetryNode |
146 crate::node::NodeType::CircuitBreakerNode => {
147 Box::new(ConcreteRuleNode::new(node_id, "true"))
150 }
151 };
152
153 executor.register_node(node);
154 }
155
156 Ok(executor)
157 }
158
159 pub fn register_node(&mut self, node: Box<dyn Node>) {
161 let id = node.id().to_string();
162 self.nodes.insert(id, node);
163 }
164
165 fn detect_cycles(&self, graph: &Graph) -> Result<()> {
167 let mut visited = HashSet::new();
168 let mut rec_stack = HashSet::new();
169
170 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
172 for edge in &graph.def.edges {
173 adj_list
174 .entry(edge.from.clone())
175 .or_insert_with(Vec::new)
176 .push(edge.to.clone());
177 }
178
179 fn dfs_cycle_check(
181 node: &str,
182 adj_list: &HashMap<String, Vec<String>>,
183 visited: &mut HashSet<String>,
184 rec_stack: &mut HashSet<String>,
185 path: &mut Vec<String>,
186 ) -> Option<Vec<String>> {
187 visited.insert(node.to_string());
188 rec_stack.insert(node.to_string());
189 path.push(node.to_string());
190
191 if let Some(neighbors) = adj_list.get(node) {
192 for neighbor in neighbors {
193 if !visited.contains(neighbor) {
194 if let Some(cycle) = dfs_cycle_check(neighbor, adj_list, visited, rec_stack, path) {
195 return Some(cycle);
196 }
197 } else if rec_stack.contains(neighbor) {
198 let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
200 return Some(path[cycle_start..].to_vec());
201 }
202 }
203 }
204
205 path.pop();
206 rec_stack.remove(node);
207 None
208 }
209
210 for node_id in graph.def.nodes.keys() {
212 if !visited.contains(node_id) {
213 let mut path = Vec::new();
214 if let Some(cycle) = dfs_cycle_check(node_id, &adj_list, &mut visited, &mut rec_stack, &mut path) {
215 return Err(anyhow::anyhow!(
216 "Cycle detected in graph: {} -> {}",
217 cycle.join(" -> "),
218 cycle.first().unwrap()
219 ));
220 }
221 }
222 }
223
224 Ok(())
225 }
226
227 pub async fn execute(&mut self, graph: &mut Graph) -> Result<()> {
229 eprintln!("\n⚡⚡⚡ CORE EXECUTOR.EXECUTE called ⚡⚡⚡");
230 eprintln!("Graph has {} nodes, {} edges", graph.def.nodes.len(), graph.def.edges.len());
231 info!("Executor: Starting graph execution");
232 let execution_start = Instant::now();
233
234 self.metrics = ExecutionMetrics::default();
236
237 graph.def.validate()?;
239
240 if graph.def.has_disconnected_components() {
242 warn!("Graph has disconnected components - some nodes may not be reachable");
243 }
244
245 self.detect_cycles(graph)?;
247
248 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
250 let mut in_degree: HashMap<String, usize> = HashMap::new();
251
252 for node_id in graph.def.nodes.keys() {
254 in_degree.insert(node_id.clone(), 0);
255 adj_list.insert(node_id.clone(), Vec::new());
256 }
257
258 for edge in &graph.def.edges {
260 adj_list
261 .entry(edge.from.clone())
262 .or_insert_with(Vec::new)
263 .push(edge.to.clone());
264
265 *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
266 }
267
268 let mut queue: VecDeque<String> = in_degree
270 .iter()
271 .filter(|(_, °ree)| degree == 0)
272 .map(|(id, _)| id.clone())
273 .collect();
274
275 if queue.is_empty() {
276 return Err(anyhow::anyhow!(
277 "No starting nodes found in graph (all nodes have incoming edges). \
278 This indicates either a cycle (which should have been caught earlier) \
279 or an invalid graph structure. Nodes: {:?}",
280 graph.def.nodes.keys().collect::<Vec<_>>()
281 ));
282 }
283
284 let mut executed = HashSet::new();
285 let mut execution_order = Vec::new();
286
287 while let Some(node_id) = queue.pop_front() {
289 if executed.contains(&node_id) {
290 continue;
291 }
292
293 info!("Executor: Processing node '{}'", node_id);
294
295 let incoming_edges: Vec<_> = graph
298 .def
299 .edges
300 .iter()
301 .filter(|e| e.to == node_id && executed.contains(&e.from))
302 .collect();
303
304 let mut should_execute = true;
305
306 for edge in &incoming_edges {
307 if let Some(rule_id) = &edge.rule {
308 let rule = Rule::new(rule_id, "true"); match rule.evaluate(&graph.context.data) {
311 Ok(result) => {
312 debug!(
313 "Rule '{}' for edge {} -> {} evaluated to: {:?}",
314 rule_id, edge.from, edge.to, result
315 );
316
317 if let serde_json::Value::Bool(false) = result {
318 should_execute = false;
319 info!(
320 "Skipping node '{}' due to failed rule '{}' from executed node '{}'",
321 node_id, rule_id, edge.from
322 );
323 self.metrics.nodes_skipped += 1;
324 break;
325 }
326 }
327 Err(e) => {
328 warn!(
329 "Rule '{}' evaluation failed: {}. Assuming true.",
330 rule_id, e
331 );
332 }
333 }
334 }
335 }
336
337 if should_execute {
339 eprintln!("🔵 About to execute node: {}", node_id);
340 if let Some(node) = self.nodes.get(&node_id) {
341 let node_start = Instant::now();
342 let mut cache_hit = false;
343
344 let mut relevant_context: HashMap<String, serde_json::Value> = incoming_edges
347 .iter()
348 .filter_map(|edge| {
349 graph.context.data.get(&format!("{}_result", edge.from))
350 .map(|v| (edge.from.clone(), v.clone()))
351 })
352 .collect();
353
354 if incoming_edges.is_empty() {
357 for (key, value) in &graph.context.data {
359 if !key.ends_with("_result") {
360 relevant_context.insert(format!("_initial_{}", key), value.clone());
361 }
362 }
363 }
364
365 let context_value = serde_json::to_value(&relevant_context)?;
366 let cache_key = CacheKey::new(&node_id, &context_value);
367
368 let cached_result = if let Some(cache) = &self.cache {
370 cache.get(&cache_key)
371 } else {
372 None
373 };
374
375 let result = if let Some(cached_value) = cached_result {
376 info!("Node '{}' result retrieved from cache", node_id);
377 cache_hit = true;
378 self.metrics.cache_hits += 1;
379
380 if let serde_json::Value::Object(cached_obj) = cached_value {
382 for (k, v) in cached_obj {
383 graph.context.data.insert(k, v);
384 }
385 }
386
387 Ok(serde_json::Value::Null) } else {
389 let exec_result = node.run(&mut graph.context).await;
391
392 if exec_result.is_err() {
394 let fallback = crate::fault_tolerance::degradation::degrade_on_failure(
395 &node_id,
396 &mut graph.context,
397 self.fallback_handler,
398 );
399 if fallback.is_some() {
400 info!("Applied fallback for node '{}'", node_id);
401 }
402 }
403
404 if let Some(cache) = &self.cache {
406 let mut result_only: HashMap<String, serde_json::Value> = HashMap::new();
407 for (key, value) in &graph.context.data {
408 if key.ends_with("_result") {
409 result_only.insert(key.clone(), value.clone());
410 }
411 }
412 let context_result = serde_json::to_value(&result_only)?;
413 if let Err(e) = cache.put(cache_key, context_result, None) {
414 warn!("Failed to cache result for node '{}': {}", node_id, e);
415 }
416 }
417
418 exec_result
419 };
420
421 match result {
422 Ok(_) => {
423 let duration = node_start.elapsed();
424 info!("Node '{}' executed successfully in {:?}", node_id, duration);
425 execution_order.push(node_id.clone());
426
427 self.metrics.nodes_executed += 1;
428 self.metrics.node_stats.push(NodeExecutionStats {
429 node_id: node_id.clone(),
430 duration,
431 cache_hit,
432 success: true,
433 });
434 }
435 Err(e) => {
436 let duration = node_start.elapsed();
437 warn!("Node '{}' execution failed: {:?}", node_id, e);
438
439 self.metrics.nodes_failed += 1;
440 self.metrics.node_stats.push(NodeExecutionStats {
441 node_id: node_id.clone(),
442 duration,
443 cache_hit,
444 success: false,
445 });
446 }
447 }
448 } else {
449 warn!("Node '{}' not found in executor", node_id);
450 }
451 }
452
453 executed.insert(node_id.clone());
454
455 if let Some(neighbors) = adj_list.get(&node_id) {
457 for neighbor in neighbors {
458 if let Some(degree) = in_degree.get_mut(neighbor) {
459 *degree = degree.saturating_sub(1);
460 if *degree == 0 && !executed.contains(neighbor) {
461 queue.push_back(neighbor.clone());
462 }
463 }
464 }
465 }
466 }
467
468 self.metrics.total_duration = execution_start.elapsed();
469
470 info!(
471 "Executor: Completed execution in {:?}. Executed: {}, Skipped: {}, Failed: {}, Cache hits: {}",
472 self.metrics.total_duration,
473 self.metrics.nodes_executed,
474 self.metrics.nodes_skipped,
475 self.metrics.nodes_failed,
476 self.metrics.cache_hits
477 );
478
479 let unexecuted: Vec<_> = graph
481 .def
482 .nodes
483 .keys()
484 .filter(|id| !executed.contains(*id))
485 .collect();
486
487 if !unexecuted.is_empty() {
488 return Err(anyhow::anyhow!(
489 "Some nodes were not executed: {:?}. This indicates a bug in the executor logic.",
490 unexecuted
491 ));
492 }
493
494 Ok(())
495 }
496}
497
498impl Default for Executor {
499 fn default() -> Self {
500 Self::new()
501 }
502}