1use anyhow::Result;
2use std::collections::{HashMap, HashSet, VecDeque};
3use std::time::{Duration, Instant};
4use tracing::{debug, info, warn};
5
6use crate::cache::{CacheKey, CacheManager};
7use crate::core::{Graph, GraphDef};
8use crate::node::{AINode, DBNode, Node, RuleNode as ConcreteRuleNode};
9use crate::rule::Rule;
10
11#[derive(Debug, Clone)]
13pub struct NodeExecutionStats {
14 pub node_id: String,
15 pub duration: Duration,
16 pub cache_hit: bool,
17 pub success: bool,
18}
19
20#[derive(Debug, Clone, Default)]
22pub struct ExecutionMetrics {
23 pub total_duration: Duration,
24 pub nodes_executed: usize,
25 pub nodes_skipped: usize,
26 pub nodes_failed: usize,
27 pub cache_hits: usize,
28 pub node_stats: Vec<NodeExecutionStats>,
29}
30
31pub struct Executor {
52 nodes: HashMap<String, Box<dyn Node>>,
53 cache: Option<CacheManager>,
54 metrics: ExecutionMetrics,
55 fallback_handler: Option<crate::fault_tolerance::degradation::FallbackHandler>,
56}
57
58impl Executor {
59 pub fn new() -> Self {
60 Self {
61 nodes: HashMap::new(),
62 cache: None,
63 metrics: ExecutionMetrics::default(),
64 fallback_handler: None,
65 }
66 }
67
68 pub fn with_cache(cache: CacheManager) -> Self {
70 Self {
71 nodes: HashMap::new(),
72 cache: Some(cache),
73 metrics: ExecutionMetrics::default(),
74 fallback_handler: None,
75 }
76 }
77
78 pub fn set_cache(&mut self, cache: CacheManager) {
80 self.cache = Some(cache);
81 }
82
83 pub fn set_fallback_handler(
85 &mut self,
86 handler: crate::fault_tolerance::degradation::FallbackHandler,
87 ) {
88 self.fallback_handler = Some(handler);
89 }
90
91 pub fn cache(&self) -> Option<&CacheManager> {
93 self.cache.as_ref()
94 }
95
96 pub fn metrics(&self) -> &ExecutionMetrics {
98 &self.metrics
99 }
100
101 pub fn reset_metrics(&mut self) {
103 self.metrics = ExecutionMetrics::default();
104 }
105
106 pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
108 let mut executor = Self::new();
109
110 for (node_id, config) in &def.nodes {
112 let node: Box<dyn Node> = match config.node_type {
113 crate::node::NodeType::RuleNode => {
114 let condition = config.condition.as_deref().unwrap_or("true");
115 Box::new(ConcreteRuleNode::new(node_id, condition))
116 }
117 crate::node::NodeType::DBNode => {
118 let query = config
119 .query
120 .clone()
121 .unwrap_or_else(|| format!("SELECT * FROM {}", node_id));
122
123 if let Some(params) = config.params.clone() {
125 Box::new(DBNode::with_params(node_id, query, params))
126 } else {
127 Box::new(DBNode::new(node_id, query))
128 }
129 }
130 crate::node::NodeType::AINode => {
131 let prompt = config
132 .prompt
133 .clone()
134 .unwrap_or_else(|| format!("Process data for {}", node_id));
135 Box::new(AINode::new(node_id, prompt))
136 }
137 crate::node::NodeType::GrpcNode => {
138 let query = config
140 .query
141 .clone()
142 .unwrap_or_else(|| format!("http://localhost:50051#{}_method", node_id));
143 let parts: Vec<&str> = query.split('#').collect();
144 let service_url = parts
145 .get(0)
146 .unwrap_or(&"http://localhost:50051")
147 .to_string();
148 let method = parts.get(1).unwrap_or(&"UnknownMethod").to_string();
149 Box::new(crate::node::GrpcNode::new(node_id, service_url, method))
150 }
151 crate::node::NodeType::SubgraphNode
154 | crate::node::NodeType::ConditionalNode
155 | crate::node::NodeType::LoopNode
156 | crate::node::NodeType::TryCatchNode
157 | crate::node::NodeType::RetryNode
158 | crate::node::NodeType::CircuitBreakerNode => {
159 Box::new(ConcreteRuleNode::new(node_id, "true"))
162 }
163 };
164
165 executor.register_node(node);
166 }
167
168 Ok(executor)
169 }
170
171 pub fn register_node(&mut self, node: Box<dyn Node>) {
173 let id = node.id().to_string();
174 self.nodes.insert(id, node);
175 }
176
177 fn detect_cycles(&self, graph: &Graph) -> Result<()> {
179 let mut visited = HashSet::new();
180 let mut rec_stack = HashSet::new();
181
182 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
184 for edge in &graph.def.edges {
185 adj_list
186 .entry(edge.from.clone())
187 .or_insert_with(Vec::new)
188 .push(edge.to.clone());
189 }
190
191 fn dfs_cycle_check(
193 node: &str,
194 adj_list: &HashMap<String, Vec<String>>,
195 visited: &mut HashSet<String>,
196 rec_stack: &mut HashSet<String>,
197 path: &mut Vec<String>,
198 ) -> Option<Vec<String>> {
199 visited.insert(node.to_string());
200 rec_stack.insert(node.to_string());
201 path.push(node.to_string());
202
203 if let Some(neighbors) = adj_list.get(node) {
204 for neighbor in neighbors {
205 if !visited.contains(neighbor) {
206 if let Some(cycle) =
207 dfs_cycle_check(neighbor, adj_list, visited, rec_stack, path)
208 {
209 return Some(cycle);
210 }
211 } else if rec_stack.contains(neighbor) {
212 let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
214 return Some(path[cycle_start..].to_vec());
215 }
216 }
217 }
218
219 path.pop();
220 rec_stack.remove(node);
221 None
222 }
223
224 for node_id in graph.def.nodes.keys() {
226 if !visited.contains(node_id) {
227 let mut path = Vec::new();
228 if let Some(cycle) =
229 dfs_cycle_check(node_id, &adj_list, &mut visited, &mut rec_stack, &mut path)
230 {
231 return Err(anyhow::anyhow!(
232 "Cycle detected in graph: {} -> {}",
233 cycle.join(" -> "),
234 cycle.first().unwrap()
235 ));
236 }
237 }
238 }
239
240 Ok(())
241 }
242
243 pub async fn execute(&mut self, graph: &mut Graph) -> Result<()> {
245 eprintln!("\n⚡⚡⚡ CORE EXECUTOR.EXECUTE called ⚡⚡⚡");
246 eprintln!(
247 "Graph has {} nodes, {} edges",
248 graph.def.nodes.len(),
249 graph.def.edges.len()
250 );
251 info!("Executor: Starting graph execution");
252 let execution_start = Instant::now();
253
254 self.metrics = ExecutionMetrics::default();
256
257 graph.def.validate()?;
259
260 if graph.def.has_disconnected_components() {
262 warn!("Graph has disconnected components - some nodes may not be reachable");
263 }
264
265 self.detect_cycles(graph)?;
267
268 let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
270 let mut in_degree: HashMap<String, usize> = HashMap::new();
271
272 for node_id in graph.def.nodes.keys() {
274 in_degree.insert(node_id.clone(), 0);
275 adj_list.insert(node_id.clone(), Vec::new());
276 }
277
278 for edge in &graph.def.edges {
280 adj_list
281 .entry(edge.from.clone())
282 .or_insert_with(Vec::new)
283 .push(edge.to.clone());
284
285 *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
286 }
287
288 let mut queue: VecDeque<String> = in_degree
290 .iter()
291 .filter(|(_, °ree)| degree == 0)
292 .map(|(id, _)| id.clone())
293 .collect();
294
295 if queue.is_empty() {
296 return Err(anyhow::anyhow!(
297 "No starting nodes found in graph (all nodes have incoming edges). \
298 This indicates either a cycle (which should have been caught earlier) \
299 or an invalid graph structure. Nodes: {:?}",
300 graph.def.nodes.keys().collect::<Vec<_>>()
301 ));
302 }
303
304 let mut executed = HashSet::new();
305 let mut execution_order = Vec::new();
306
307 while let Some(node_id) = queue.pop_front() {
309 if executed.contains(&node_id) {
310 continue;
311 }
312
313 info!("Executor: Processing node '{}'", node_id);
314
315 let incoming_edges: Vec<_> = graph
318 .def
319 .edges
320 .iter()
321 .filter(|e| e.to == node_id && executed.contains(&e.from))
322 .collect();
323
324 let mut should_execute = true;
325
326 for edge in &incoming_edges {
327 if let Some(rule_id) = &edge.rule {
328 let rule = Rule::new(rule_id, "true"); match rule.evaluate(&graph.context.data) {
331 Ok(result) => {
332 debug!(
333 "Rule '{}' for edge {} -> {} evaluated to: {:?}",
334 rule_id, edge.from, edge.to, result
335 );
336
337 if let serde_json::Value::Bool(false) = result {
338 should_execute = false;
339 info!(
340 "Skipping node '{}' due to failed rule '{}' from executed node '{}'",
341 node_id, rule_id, edge.from
342 );
343 self.metrics.nodes_skipped += 1;
344 break;
345 }
346 }
347 Err(e) => {
348 warn!(
349 "Rule '{}' evaluation failed: {}. Assuming true.",
350 rule_id, e
351 );
352 }
353 }
354 }
355 }
356
357 if should_execute {
359 eprintln!("🔵 About to execute node: {}", node_id);
360 if let Some(node) = self.nodes.get(&node_id) {
361 let node_start = Instant::now();
362 let mut cache_hit = false;
363
364 let mut relevant_context: HashMap<String, serde_json::Value> = incoming_edges
367 .iter()
368 .filter_map(|edge| {
369 graph
370 .context
371 .data
372 .get(&format!("{}_result", edge.from))
373 .map(|v| (edge.from.clone(), v.clone()))
374 })
375 .collect();
376
377 if incoming_edges.is_empty() {
380 for (key, value) in &graph.context.data {
382 if !key.ends_with("_result") {
383 relevant_context.insert(format!("_initial_{}", key), value.clone());
384 }
385 }
386 }
387
388 let context_value = serde_json::to_value(&relevant_context)?;
389 let cache_key = CacheKey::new(&node_id, &context_value);
390
391 let cached_result = if let Some(cache) = &self.cache {
393 cache.get(&cache_key)
394 } else {
395 None
396 };
397
398 let result = if let Some(cached_value) = cached_result {
399 info!("Node '{}' result retrieved from cache", node_id);
400 cache_hit = true;
401 self.metrics.cache_hits += 1;
402
403 if let serde_json::Value::Object(cached_obj) = cached_value {
405 for (k, v) in cached_obj {
406 graph.context.data.insert(k, v);
407 }
408 }
409
410 Ok(serde_json::Value::Null) } else {
412 let exec_result = node.run(&mut graph.context).await;
414
415 if exec_result.is_err() {
417 let fallback = crate::fault_tolerance::degradation::degrade_on_failure(
418 &node_id,
419 &mut graph.context,
420 self.fallback_handler,
421 );
422 if fallback.is_some() {
423 info!("Applied fallback for node '{}'", node_id);
424 }
425 }
426
427 if let Some(cache) = &self.cache {
429 let mut result_only: HashMap<String, serde_json::Value> =
430 HashMap::new();
431 for (key, value) in &graph.context.data {
432 if key.ends_with("_result") {
433 result_only.insert(key.clone(), value.clone());
434 }
435 }
436 let context_result = serde_json::to_value(&result_only)?;
437 if let Err(e) = cache.put(cache_key, context_result, None) {
438 warn!("Failed to cache result for node '{}': {}", node_id, e);
439 }
440 }
441
442 exec_result
443 };
444
445 match result {
446 Ok(_) => {
447 let duration = node_start.elapsed();
448 info!("Node '{}' executed successfully in {:?}", node_id, duration);
449 execution_order.push(node_id.clone());
450
451 self.metrics.nodes_executed += 1;
452 self.metrics.node_stats.push(NodeExecutionStats {
453 node_id: node_id.clone(),
454 duration,
455 cache_hit,
456 success: true,
457 });
458 }
459 Err(e) => {
460 let duration = node_start.elapsed();
461 warn!("Node '{}' execution failed: {:?}", node_id, e);
462
463 self.metrics.nodes_failed += 1;
464 self.metrics.node_stats.push(NodeExecutionStats {
465 node_id: node_id.clone(),
466 duration,
467 cache_hit,
468 success: false,
469 });
470 }
471 }
472 } else {
473 warn!("Node '{}' not found in executor", node_id);
474 }
475 }
476
477 executed.insert(node_id.clone());
478
479 if let Some(neighbors) = adj_list.get(&node_id) {
481 for neighbor in neighbors {
482 if let Some(degree) = in_degree.get_mut(neighbor) {
483 *degree = degree.saturating_sub(1);
484 if *degree == 0 && !executed.contains(neighbor) {
485 queue.push_back(neighbor.clone());
486 }
487 }
488 }
489 }
490 }
491
492 self.metrics.total_duration = execution_start.elapsed();
493
494 info!(
495 "Executor: Completed execution in {:?}. Executed: {}, Skipped: {}, Failed: {}, Cache hits: {}",
496 self.metrics.total_duration,
497 self.metrics.nodes_executed,
498 self.metrics.nodes_skipped,
499 self.metrics.nodes_failed,
500 self.metrics.cache_hits
501 );
502
503 let unexecuted: Vec<_> = graph
505 .def
506 .nodes
507 .keys()
508 .filter(|id| !executed.contains(*id))
509 .collect();
510
511 if !unexecuted.is_empty() {
512 return Err(anyhow::anyhow!(
513 "Some nodes were not executed: {:?}. This indicates a bug in the executor logic.",
514 unexecuted
515 ));
516 }
517
518 Ok(())
519 }
520}
521
522impl Default for Executor {
523 fn default() -> Self {
524 Self::new()
525 }
526}