rust_logic_graph/core/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4use std::time::{Duration, Instant};
5
6use crate::core::{Graph, GraphDef};
7use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
8use crate::rule::Rule;
9use crate::cache::{CacheManager, CacheKey};
10
11/// Execution statistics for a single node
12#[derive(Debug, Clone)]
13pub struct NodeExecutionStats {
14    pub node_id: String,
15    pub duration: Duration,
16    pub cache_hit: bool,
17    pub success: bool,
18}
19
20/// Overall execution metrics
21#[derive(Debug, Clone, Default)]
22pub struct ExecutionMetrics {
23    pub total_duration: Duration,
24    pub nodes_executed: usize,
25    pub nodes_skipped: usize,
26    pub nodes_failed: usize,
27    pub cache_hits: usize,
28    pub node_stats: Vec<NodeExecutionStats>,
29}
30
31/// Executor for running graph nodes in topological order.
32/// 
33/// # Thread Safety
34/// 
35/// The Executor is **NOT thread-safe** for concurrent executions on the same instance.
36/// While `execute()` is async and takes `&self` (shared reference), the underlying
37/// implementation assumes single-threaded access patterns:
38/// 
39/// - `self.nodes` is a regular `HashMap` without synchronization
40/// - Multiple concurrent calls to `execute()` would have data races when accessing nodes
41/// 
42/// ## Safe Usage Patterns
43/// 
44/// 1. **Single execution at a time**: Only call `execute()` once at a time per executor instance
45/// 2. **Clone for parallelism**: Create separate executor instances for parallel graph executions
46/// 3. **Sequential async**: Use `.await` to ensure executions don't overlap
47/// 
48/// ## Future Work
49/// 
50/// For true concurrent execution support, wrap `nodes` in `Arc<RwLock<HashMap>>` or similar.
51pub struct Executor {
52    nodes: HashMap<String, Box<dyn Node>>,
53    cache: Option<CacheManager>,
54    metrics: ExecutionMetrics,
55    fallback_handler: Option<crate::fault_tolerance::degradation::FallbackHandler>,
56}
57
58impl Executor {
59    pub fn new() -> Self {
60        Self {
61            nodes: HashMap::new(),
62            cache: None,
63            metrics: ExecutionMetrics::default(),
64            fallback_handler: None,
65        }
66    }
67
68    /// Create a new executor with caching enabled
69    pub fn with_cache(cache: CacheManager) -> Self {
70        Self {
71            nodes: HashMap::new(),
72            cache: Some(cache),
73            metrics: ExecutionMetrics::default(),
74            fallback_handler: None,
75        }
76    }
77
78    /// Enable caching for this executor
79    pub fn set_cache(&mut self, cache: CacheManager) {
80        self.cache = Some(cache);
81    }
82
83    /// Set a global fallback handler used when node execution fails
84    pub fn set_fallback_handler(&mut self, handler: crate::fault_tolerance::degradation::FallbackHandler) {
85        self.fallback_handler = Some(handler);
86    }
87
88    /// Get the cache manager (if enabled)
89    pub fn cache(&self) -> Option<&CacheManager> {
90        self.cache.as_ref()
91    }
92    
93    /// Get execution metrics from last run
94    pub fn metrics(&self) -> &ExecutionMetrics {
95        &self.metrics
96    }
97    
98    /// Reset execution metrics
99    pub fn reset_metrics(&mut self) {
100        self.metrics = ExecutionMetrics::default();
101    }
102
103    /// Build executor from graph definition
104    pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
105        let mut executor = Self::new();
106
107        // Create concrete node instances based on NodeConfig
108        for (node_id, config) in &def.nodes {
109            let node: Box<dyn Node> = match config.node_type {
110                crate::node::NodeType::RuleNode => {
111                    let condition = config.condition.as_deref().unwrap_or("true");
112                    Box::new(ConcreteRuleNode::new(node_id, condition))
113                }
114                crate::node::NodeType::DBNode => {
115                    let query = config.query.clone()
116                        .unwrap_or_else(|| format!("SELECT * FROM {}", node_id));
117                    
118                    // Create DBNode with params if specified
119                    if let Some(params) = config.params.clone() {
120                        Box::new(DBNode::with_params(node_id, query, params))
121                    } else {
122                        Box::new(DBNode::new(node_id, query))
123                    }
124                }
125                crate::node::NodeType::AINode => {
126                    let prompt = config.prompt.clone()
127                        .unwrap_or_else(|| format!("Process data for {}", node_id));
128                    Box::new(AINode::new(node_id, prompt))
129                }
130                crate::node::NodeType::GrpcNode => {
131                    // Parse query field as "service_url#method"
132                    let query = config.query.clone()
133                        .unwrap_or_else(|| format!("http://localhost:50051#{}_method", node_id));
134                    let parts: Vec<&str> = query.split('#').collect();
135                    let service_url = parts.get(0).unwrap_or(&"http://localhost:50051").to_string();
136                    let method = parts.get(1).unwrap_or(&"UnknownMethod").to_string();
137                    Box::new(crate::node::GrpcNode::new(node_id, service_url, method))
138                }
139                // Advanced nodes - not yet fully supported in from_graph_def
140                // Use Node trait directly if needed
141                crate::node::NodeType::SubgraphNode |
142                crate::node::NodeType::ConditionalNode |
143                crate::node::NodeType::LoopNode |
144                crate::node::NodeType::TryCatchNode |
145                crate::node::NodeType::RetryNode |
146                crate::node::NodeType::CircuitBreakerNode => {
147                    // Placeholder: create a simple rule node
148                    // TODO: Implement proper constructors for advanced nodes
149                    Box::new(ConcreteRuleNode::new(node_id, "true"))
150                }
151            };
152
153            executor.register_node(node);
154        }
155
156        Ok(executor)
157    }
158
159    /// Register a node with the executor
160    pub fn register_node(&mut self, node: Box<dyn Node>) {
161        let id = node.id().to_string();
162        self.nodes.insert(id, node);
163    }
164
165    /// Detect cycles in the graph using DFS
166    fn detect_cycles(&self, graph: &Graph) -> Result<()> {
167        let mut visited = HashSet::new();
168        let mut rec_stack = HashSet::new();
169        
170        // Build adjacency list for cycle detection
171        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
172        for edge in &graph.def.edges {
173            adj_list
174                .entry(edge.from.clone())
175                .or_insert_with(Vec::new)
176                .push(edge.to.clone());
177        }
178        
179        // DFS to detect cycles
180        fn dfs_cycle_check(
181            node: &str,
182            adj_list: &HashMap<String, Vec<String>>,
183            visited: &mut HashSet<String>,
184            rec_stack: &mut HashSet<String>,
185            path: &mut Vec<String>,
186        ) -> Option<Vec<String>> {
187            visited.insert(node.to_string());
188            rec_stack.insert(node.to_string());
189            path.push(node.to_string());
190            
191            if let Some(neighbors) = adj_list.get(node) {
192                for neighbor in neighbors {
193                    if !visited.contains(neighbor) {
194                        if let Some(cycle) = dfs_cycle_check(neighbor, adj_list, visited, rec_stack, path) {
195                            return Some(cycle);
196                        }
197                    } else if rec_stack.contains(neighbor) {
198                        // Found a cycle - return the cycle path
199                        let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
200                        return Some(path[cycle_start..].to_vec());
201                    }
202                }
203            }
204            
205            path.pop();
206            rec_stack.remove(node);
207            None
208        }
209        
210        // Check all nodes
211        for node_id in graph.def.nodes.keys() {
212            if !visited.contains(node_id) {
213                let mut path = Vec::new();
214                if let Some(cycle) = dfs_cycle_check(node_id, &adj_list, &mut visited, &mut rec_stack, &mut path) {
215                    return Err(anyhow::anyhow!(
216                        "Cycle detected in graph: {} -> {}",
217                        cycle.join(" -> "),
218                        cycle.first().unwrap()
219                    ));
220                }
221            }
222        }
223        
224        Ok(())
225    }
226
227    /// Execute the graph in topological order
228    pub async fn execute(&mut self, graph: &mut Graph) -> Result<()> {
229        eprintln!("\n⚡⚡⚡ CORE EXECUTOR.EXECUTE called ⚡⚡⚡");
230        eprintln!("Graph has {} nodes, {} edges", graph.def.nodes.len(), graph.def.edges.len());
231        info!("Executor: Starting graph execution");
232        let execution_start = Instant::now();
233        
234        // Reset metrics
235        self.metrics = ExecutionMetrics::default();
236
237        // Validate graph structure first
238        graph.def.validate()?;
239        
240        // Warn about disconnected components
241        if graph.def.has_disconnected_components() {
242            warn!("Graph has disconnected components - some nodes may not be reachable");
243        }
244
245        // First, detect cycles in the graph
246        self.detect_cycles(graph)?;
247
248        // Build adjacency list and in-degree map
249        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
250        let mut in_degree: HashMap<String, usize> = HashMap::new();
251
252        // Initialize all nodes with 0 in-degree
253        for node_id in graph.def.nodes.keys() {
254            in_degree.insert(node_id.clone(), 0);
255            adj_list.insert(node_id.clone(), Vec::new());
256        }
257
258        // Build the graph structure
259        for edge in &graph.def.edges {
260            adj_list
261                .entry(edge.from.clone())
262                .or_insert_with(Vec::new)
263                .push(edge.to.clone());
264
265            *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
266        }
267
268        // Find all nodes with in-degree 0 (starting nodes)
269        let mut queue: VecDeque<String> = in_degree
270            .iter()
271            .filter(|(_, &degree)| degree == 0)
272            .map(|(id, _)| id.clone())
273            .collect();
274
275        if queue.is_empty() {
276            return Err(anyhow::anyhow!(
277                "No starting nodes found in graph (all nodes have incoming edges). \
278                This indicates either a cycle (which should have been caught earlier) \
279                or an invalid graph structure. Nodes: {:?}",
280                graph.def.nodes.keys().collect::<Vec<_>>()
281            ));
282        }
283
284        let mut executed = HashSet::new();
285        let mut execution_order = Vec::new();
286
287        // Topological sort & execution
288        while let Some(node_id) = queue.pop_front() {
289            if executed.contains(&node_id) {
290                continue;
291            }
292
293            info!("Executor: Processing node '{}'", node_id);
294
295            // Check if all incoming edges have their rules satisfied
296            // IMPORTANT: Only check edges from nodes that have been executed
297            let incoming_edges: Vec<_> = graph
298                .def
299                .edges
300                .iter()
301                .filter(|e| e.to == node_id && executed.contains(&e.from))
302                .collect();
303
304            let mut should_execute = true;
305
306            for edge in &incoming_edges {
307                if let Some(rule_id) = &edge.rule {
308                    let rule = Rule::new(rule_id, "true"); // Default condition
309
310                    match rule.evaluate(&graph.context.data) {
311                        Ok(result) => {
312                            debug!(
313                                "Rule '{}' for edge {} -> {} evaluated to: {:?}",
314                                rule_id, edge.from, edge.to, result
315                            );
316
317                            if let serde_json::Value::Bool(false) = result {
318                                should_execute = false;
319                                info!(
320                                    "Skipping node '{}' due to failed rule '{}' from executed node '{}'",
321                                    node_id, rule_id, edge.from
322                                );
323                                self.metrics.nodes_skipped += 1;
324                                break;
325                            }
326                        }
327                        Err(e) => {
328                            warn!(
329                                "Rule '{}' evaluation failed: {}. Assuming true.",
330                                rule_id, e
331                            );
332                        }
333                    }
334                }
335            }
336
337            // Execute the node
338            if should_execute {
339                eprintln!("🔵 About to execute node: {}", node_id);
340                if let Some(node) = self.nodes.get(&node_id) {
341                    let node_start = Instant::now();
342                    let mut cache_hit = false;
343                    
344                    // Create cache key based on node ID and relevant context
345                    // Include both upstream node results AND initial parameters
346                    let mut relevant_context: HashMap<String, serde_json::Value> = incoming_edges
347                        .iter()
348                        .filter_map(|edge| {
349                            graph.context.data.get(&format!("{}_result", edge.from))
350                                .map(|v| (edge.from.clone(), v.clone()))
351                        })
352                        .collect();
353                    
354                    // For nodes with no incoming edges (root nodes), include initial params in cache key
355                    // This ensures different initial parameters (e.g., product_id) create different cache entries
356                    if incoming_edges.is_empty() {
357                        // Include all non-result context keys as initial params
358                        for (key, value) in &graph.context.data {
359                            if !key.ends_with("_result") {
360                                relevant_context.insert(format!("_initial_{}", key), value.clone());
361                            }
362                        }
363                    }
364                    
365                    let context_value = serde_json::to_value(&relevant_context)?;
366                    let cache_key = CacheKey::new(&node_id, &context_value);
367
368                    // Check cache first
369                    let cached_result = if let Some(cache) = &self.cache {
370                        cache.get(&cache_key)
371                    } else {
372                        None
373                    };
374
375                    let result = if let Some(cached_value) = cached_result {
376                        info!("Node '{}' result retrieved from cache", node_id);
377                        cache_hit = true;
378                        self.metrics.cache_hits += 1;
379                        
380                        // Merge cached result into context
381                        if let serde_json::Value::Object(cached_obj) = cached_value {
382                            for (k, v) in cached_obj {
383                                graph.context.data.insert(k, v);
384                            }
385                        }
386                        
387                        Ok(serde_json::Value::Null) // Successfully used cache
388                    } else {
389                        // Execute node and cache result
390                        let exec_result = node.run(&mut graph.context).await;
391
392                        // On failure, attempt graceful degradation via fallback handler
393                        if exec_result.is_err() {
394                            let fallback = crate::fault_tolerance::degradation::degrade_on_failure(
395                                &node_id,
396                                &mut graph.context,
397                                self.fallback_handler,
398                            );
399                            if fallback.is_some() {
400                                info!("Applied fallback for node '{}'", node_id);
401                            }
402                        }
403
404                        // Store result in cache if execution succeeded (or fallback set _result)
405                        if let Some(cache) = &self.cache {
406                            let mut result_only: HashMap<String, serde_json::Value> = HashMap::new();
407                            for (key, value) in &graph.context.data {
408                                if key.ends_with("_result") {
409                                    result_only.insert(key.clone(), value.clone());
410                                }
411                            }
412                            let context_result = serde_json::to_value(&result_only)?;
413                            if let Err(e) = cache.put(cache_key, context_result, None) {
414                                warn!("Failed to cache result for node '{}': {}", node_id, e);
415                            }
416                        }
417                        
418                        exec_result
419                    };
420
421                    match result {
422                        Ok(_) => {
423                            let duration = node_start.elapsed();
424                            info!("Node '{}' executed successfully in {:?}", node_id, duration);
425                            execution_order.push(node_id.clone());
426                            
427                            self.metrics.nodes_executed += 1;
428                            self.metrics.node_stats.push(NodeExecutionStats {
429                                node_id: node_id.clone(),
430                                duration,
431                                cache_hit,
432                                success: true,
433                            });
434                        }
435                        Err(e) => {
436                            let duration = node_start.elapsed();
437                            warn!("Node '{}' execution failed: {:?}", node_id, e);
438                            
439                            self.metrics.nodes_failed += 1;
440                            self.metrics.node_stats.push(NodeExecutionStats {
441                                node_id: node_id.clone(),
442                                duration,
443                                cache_hit,
444                                success: false,
445                            });
446                        }
447                    }
448                } else {
449                    warn!("Node '{}' not found in executor", node_id);
450                }
451            }
452
453            executed.insert(node_id.clone());
454
455            // Add downstream nodes to queue
456            if let Some(neighbors) = adj_list.get(&node_id) {
457                for neighbor in neighbors {
458                    if let Some(degree) = in_degree.get_mut(neighbor) {
459                        *degree = degree.saturating_sub(1);
460                        if *degree == 0 && !executed.contains(neighbor) {
461                            queue.push_back(neighbor.clone());
462                        }
463                    }
464                }
465            }
466        }
467
468        self.metrics.total_duration = execution_start.elapsed();
469        
470        info!(
471            "Executor: Completed execution in {:?}. Executed: {}, Skipped: {}, Failed: {}, Cache hits: {}",
472            self.metrics.total_duration,
473            self.metrics.nodes_executed,
474            self.metrics.nodes_skipped,
475            self.metrics.nodes_failed,
476            self.metrics.cache_hits
477        );
478
479        // Verify all nodes were executed (should not happen with cycle detection)
480        let unexecuted: Vec<_> = graph
481            .def
482            .nodes
483            .keys()
484            .filter(|id| !executed.contains(*id))
485            .collect();
486
487        if !unexecuted.is_empty() {
488            return Err(anyhow::anyhow!(
489                "Some nodes were not executed: {:?}. This indicates a bug in the executor logic.",
490                unexecuted
491            ));
492        }
493
494        Ok(())
495    }
496}
497
498impl Default for Executor {
499    fn default() -> Self {
500        Self::new()
501    }
502}