rust_logic_graph/core/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4use std::time::{Duration, Instant};
5
6use crate::core::{Graph, GraphDef};
7use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
8use crate::rule::Rule;
9use crate::cache::{CacheManager, CacheKey};
10
11/// Execution statistics for a single node
12#[derive(Debug, Clone)]
13pub struct NodeExecutionStats {
14    pub node_id: String,
15    pub duration: Duration,
16    pub cache_hit: bool,
17    pub success: bool,
18}
19
20/// Overall execution metrics
21#[derive(Debug, Clone, Default)]
22pub struct ExecutionMetrics {
23    pub total_duration: Duration,
24    pub nodes_executed: usize,
25    pub nodes_skipped: usize,
26    pub nodes_failed: usize,
27    pub cache_hits: usize,
28    pub node_stats: Vec<NodeExecutionStats>,
29}
30
31/// Executor for running graph nodes in topological order.
32/// 
33/// # Thread Safety
34/// 
35/// The Executor is **NOT thread-safe** for concurrent executions on the same instance.
36/// While `execute()` is async and takes `&self` (shared reference), the underlying
37/// implementation assumes single-threaded access patterns:
38/// 
39/// - `self.nodes` is a regular `HashMap` without synchronization
40/// - Multiple concurrent calls to `execute()` would have data races when accessing nodes
41/// 
42/// ## Safe Usage Patterns
43/// 
44/// 1. **Single execution at a time**: Only call `execute()` once at a time per executor instance
45/// 2. **Clone for parallelism**: Create separate executor instances for parallel graph executions
46/// 3. **Sequential async**: Use `.await` to ensure executions don't overlap
47/// 
48/// ## Future Work
49/// 
50/// For true concurrent execution support, wrap `nodes` in `Arc<RwLock<HashMap>>` or similar.
51pub struct Executor {
52    nodes: HashMap<String, Box<dyn Node>>,
53    cache: Option<CacheManager>,
54    metrics: ExecutionMetrics,
55}
56
57impl Executor {
58    pub fn new() -> Self {
59        Self {
60            nodes: HashMap::new(),
61            cache: None,
62            metrics: ExecutionMetrics::default(),
63        }
64    }
65
66    /// Create a new executor with caching enabled
67    pub fn with_cache(cache: CacheManager) -> Self {
68        Self {
69            nodes: HashMap::new(),
70            cache: Some(cache),
71            metrics: ExecutionMetrics::default(),
72        }
73    }
74
75    /// Enable caching for this executor
76    pub fn set_cache(&mut self, cache: CacheManager) {
77        self.cache = Some(cache);
78    }
79
80    /// Get the cache manager (if enabled)
81    pub fn cache(&self) -> Option<&CacheManager> {
82        self.cache.as_ref()
83    }
84    
85    /// Get execution metrics from last run
86    pub fn metrics(&self) -> &ExecutionMetrics {
87        &self.metrics
88    }
89    
90    /// Reset execution metrics
91    pub fn reset_metrics(&mut self) {
92        self.metrics = ExecutionMetrics::default();
93    }
94
95    /// Build executor from graph definition
96    pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
97        let mut executor = Self::new();
98
99        // Create concrete node instances based on NodeConfig
100        for (node_id, config) in &def.nodes {
101            let node: Box<dyn Node> = match config.node_type {
102                crate::node::NodeType::RuleNode => {
103                    let condition = config.condition.as_deref().unwrap_or("true");
104                    Box::new(ConcreteRuleNode::new(node_id, condition))
105                }
106                crate::node::NodeType::DBNode => {
107                    let query = config.query.clone()
108                        .unwrap_or_else(|| format!("SELECT * FROM {}", node_id));
109                    
110                    // Create DBNode with params if specified
111                    if let Some(params) = config.params.clone() {
112                        Box::new(DBNode::with_params(node_id, query, params))
113                    } else {
114                        Box::new(DBNode::new(node_id, query))
115                    }
116                }
117                crate::node::NodeType::AINode => {
118                    let prompt = config.prompt.clone()
119                        .unwrap_or_else(|| format!("Process data for {}", node_id));
120                    Box::new(AINode::new(node_id, prompt))
121                }
122                crate::node::NodeType::GrpcNode => {
123                    // Parse query field as "service_url#method"
124                    let query = config.query.clone()
125                        .unwrap_or_else(|| format!("http://localhost:50051#{}_method", node_id));
126                    let parts: Vec<&str> = query.split('#').collect();
127                    let service_url = parts.get(0).unwrap_or(&"http://localhost:50051").to_string();
128                    let method = parts.get(1).unwrap_or(&"UnknownMethod").to_string();
129                    Box::new(crate::node::GrpcNode::new(node_id, service_url, method))
130                }
131                // Advanced nodes - not yet fully supported in from_graph_def
132                // Use Node trait directly if needed
133                crate::node::NodeType::SubgraphNode |
134                crate::node::NodeType::ConditionalNode |
135                crate::node::NodeType::LoopNode |
136                crate::node::NodeType::TryCatchNode |
137                crate::node::NodeType::RetryNode |
138                crate::node::NodeType::CircuitBreakerNode => {
139                    // Placeholder: create a simple rule node
140                    // TODO: Implement proper constructors for advanced nodes
141                    Box::new(ConcreteRuleNode::new(node_id, "true"))
142                }
143            };
144
145            executor.register_node(node);
146        }
147
148        Ok(executor)
149    }
150
151    /// Register a node with the executor
152    pub fn register_node(&mut self, node: Box<dyn Node>) {
153        let id = node.id().to_string();
154        self.nodes.insert(id, node);
155    }
156
157    /// Detect cycles in the graph using DFS
158    fn detect_cycles(&self, graph: &Graph) -> Result<()> {
159        let mut visited = HashSet::new();
160        let mut rec_stack = HashSet::new();
161        
162        // Build adjacency list for cycle detection
163        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
164        for edge in &graph.def.edges {
165            adj_list
166                .entry(edge.from.clone())
167                .or_insert_with(Vec::new)
168                .push(edge.to.clone());
169        }
170        
171        // DFS to detect cycles
172        fn dfs_cycle_check(
173            node: &str,
174            adj_list: &HashMap<String, Vec<String>>,
175            visited: &mut HashSet<String>,
176            rec_stack: &mut HashSet<String>,
177            path: &mut Vec<String>,
178        ) -> Option<Vec<String>> {
179            visited.insert(node.to_string());
180            rec_stack.insert(node.to_string());
181            path.push(node.to_string());
182            
183            if let Some(neighbors) = adj_list.get(node) {
184                for neighbor in neighbors {
185                    if !visited.contains(neighbor) {
186                        if let Some(cycle) = dfs_cycle_check(neighbor, adj_list, visited, rec_stack, path) {
187                            return Some(cycle);
188                        }
189                    } else if rec_stack.contains(neighbor) {
190                        // Found a cycle - return the cycle path
191                        let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
192                        return Some(path[cycle_start..].to_vec());
193                    }
194                }
195            }
196            
197            path.pop();
198            rec_stack.remove(node);
199            None
200        }
201        
202        // Check all nodes
203        for node_id in graph.def.nodes.keys() {
204            if !visited.contains(node_id) {
205                let mut path = Vec::new();
206                if let Some(cycle) = dfs_cycle_check(node_id, &adj_list, &mut visited, &mut rec_stack, &mut path) {
207                    return Err(anyhow::anyhow!(
208                        "Cycle detected in graph: {} -> {}",
209                        cycle.join(" -> "),
210                        cycle.first().unwrap()
211                    ));
212                }
213            }
214        }
215        
216        Ok(())
217    }
218
219    /// Execute the graph in topological order
220    pub async fn execute(&mut self, graph: &mut Graph) -> Result<()> {
221        eprintln!("\n⚡⚡⚡ CORE EXECUTOR.EXECUTE called ⚡⚡⚡");
222        eprintln!("Graph has {} nodes, {} edges", graph.def.nodes.len(), graph.def.edges.len());
223        info!("Executor: Starting graph execution");
224        let execution_start = Instant::now();
225        
226        // Reset metrics
227        self.metrics = ExecutionMetrics::default();
228
229        // Validate graph structure first
230        graph.def.validate()?;
231        
232        // Warn about disconnected components
233        if graph.def.has_disconnected_components() {
234            warn!("Graph has disconnected components - some nodes may not be reachable");
235        }
236
237        // First, detect cycles in the graph
238        self.detect_cycles(graph)?;
239
240        // Build adjacency list and in-degree map
241        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
242        let mut in_degree: HashMap<String, usize> = HashMap::new();
243
244        // Initialize all nodes with 0 in-degree
245        for node_id in graph.def.nodes.keys() {
246            in_degree.insert(node_id.clone(), 0);
247            adj_list.insert(node_id.clone(), Vec::new());
248        }
249
250        // Build the graph structure
251        for edge in &graph.def.edges {
252            adj_list
253                .entry(edge.from.clone())
254                .or_insert_with(Vec::new)
255                .push(edge.to.clone());
256
257            *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
258        }
259
260        // Find all nodes with in-degree 0 (starting nodes)
261        let mut queue: VecDeque<String> = in_degree
262            .iter()
263            .filter(|(_, &degree)| degree == 0)
264            .map(|(id, _)| id.clone())
265            .collect();
266
267        if queue.is_empty() {
268            return Err(anyhow::anyhow!(
269                "No starting nodes found in graph (all nodes have incoming edges). \
270                This indicates either a cycle (which should have been caught earlier) \
271                or an invalid graph structure. Nodes: {:?}",
272                graph.def.nodes.keys().collect::<Vec<_>>()
273            ));
274        }
275
276        let mut executed = HashSet::new();
277        let mut execution_order = Vec::new();
278
279        // Topological sort & execution
280        while let Some(node_id) = queue.pop_front() {
281            if executed.contains(&node_id) {
282                continue;
283            }
284
285            info!("Executor: Processing node '{}'", node_id);
286
287            // Check if all incoming edges have their rules satisfied
288            // IMPORTANT: Only check edges from nodes that have been executed
289            let incoming_edges: Vec<_> = graph
290                .def
291                .edges
292                .iter()
293                .filter(|e| e.to == node_id && executed.contains(&e.from))
294                .collect();
295
296            let mut should_execute = true;
297
298            for edge in &incoming_edges {
299                if let Some(rule_id) = &edge.rule {
300                    let rule = Rule::new(rule_id, "true"); // Default condition
301
302                    match rule.evaluate(&graph.context.data) {
303                        Ok(result) => {
304                            debug!(
305                                "Rule '{}' for edge {} -> {} evaluated to: {:?}",
306                                rule_id, edge.from, edge.to, result
307                            );
308
309                            if let serde_json::Value::Bool(false) = result {
310                                should_execute = false;
311                                info!(
312                                    "Skipping node '{}' due to failed rule '{}' from executed node '{}'",
313                                    node_id, rule_id, edge.from
314                                );
315                                self.metrics.nodes_skipped += 1;
316                                break;
317                            }
318                        }
319                        Err(e) => {
320                            warn!(
321                                "Rule '{}' evaluation failed: {}. Assuming true.",
322                                rule_id, e
323                            );
324                        }
325                    }
326                }
327            }
328
329            // Execute the node
330            if should_execute {
331                eprintln!("🔵 About to execute node: {}", node_id);
332                if let Some(node) = self.nodes.get(&node_id) {
333                    let node_start = Instant::now();
334                    let mut cache_hit = false;
335                    
336                    // Create cache key based on node ID and relevant context
337                    // Include both upstream node results AND initial parameters
338                    let mut relevant_context: HashMap<String, serde_json::Value> = incoming_edges
339                        .iter()
340                        .filter_map(|edge| {
341                            graph.context.data.get(&format!("{}_result", edge.from))
342                                .map(|v| (edge.from.clone(), v.clone()))
343                        })
344                        .collect();
345                    
346                    // For nodes with no incoming edges (root nodes), include initial params in cache key
347                    // This ensures different initial parameters (e.g., product_id) create different cache entries
348                    if incoming_edges.is_empty() {
349                        // Include all non-result context keys as initial params
350                        for (key, value) in &graph.context.data {
351                            if !key.ends_with("_result") {
352                                relevant_context.insert(format!("_initial_{}", key), value.clone());
353                            }
354                        }
355                    }
356                    
357                    let context_value = serde_json::to_value(&relevant_context)?;
358                    let cache_key = CacheKey::new(&node_id, &context_value);
359
360                    // Check cache first
361                    let cached_result = if let Some(cache) = &self.cache {
362                        cache.get(&cache_key)
363                    } else {
364                        None
365                    };
366
367                    let result = if let Some(cached_value) = cached_result {
368                        info!("Node '{}' result retrieved from cache", node_id);
369                        cache_hit = true;
370                        self.metrics.cache_hits += 1;
371                        
372                        // Merge cached result into context
373                        if let serde_json::Value::Object(cached_obj) = cached_value {
374                            for (k, v) in cached_obj {
375                                graph.context.data.insert(k, v);
376                            }
377                        }
378                        
379                        Ok(serde_json::Value::Null) // Successfully used cache
380                    } else {
381                        // Execute node and cache result
382                        let exec_result = node.run(&mut graph.context).await;
383                        
384                        // Store result in cache if execution succeeded
385                        if exec_result.is_ok() {
386                            if let Some(cache) = &self.cache {
387                                // Only cache the node's output (keys ending with _result)
388                                // Don't cache input parameters to avoid overwriting them on cache hit
389                                let mut result_only: HashMap<String, serde_json::Value> = HashMap::new();
390                                for (key, value) in &graph.context.data {
391                                    if key.ends_with("_result") {
392                                        result_only.insert(key.clone(), value.clone());
393                                    }
394                                }
395                                let context_result = serde_json::to_value(&result_only)?;
396                                if let Err(e) = cache.put(cache_key, context_result, None) {
397                                    warn!("Failed to cache result for node '{}': {}", node_id, e);
398                                }
399                            }
400                        }
401                        
402                        exec_result
403                    };
404
405                    match result {
406                        Ok(_) => {
407                            let duration = node_start.elapsed();
408                            info!("Node '{}' executed successfully in {:?}", node_id, duration);
409                            execution_order.push(node_id.clone());
410                            
411                            self.metrics.nodes_executed += 1;
412                            self.metrics.node_stats.push(NodeExecutionStats {
413                                node_id: node_id.clone(),
414                                duration,
415                                cache_hit,
416                                success: true,
417                            });
418                        }
419                        Err(e) => {
420                            let duration = node_start.elapsed();
421                            warn!("Node '{}' execution failed: {:?}", node_id, e);
422                            
423                            self.metrics.nodes_failed += 1;
424                            self.metrics.node_stats.push(NodeExecutionStats {
425                                node_id: node_id.clone(),
426                                duration,
427                                cache_hit,
428                                success: false,
429                            });
430                        }
431                    }
432                } else {
433                    warn!("Node '{}' not found in executor", node_id);
434                }
435            }
436
437            executed.insert(node_id.clone());
438
439            // Add downstream nodes to queue
440            if let Some(neighbors) = adj_list.get(&node_id) {
441                for neighbor in neighbors {
442                    if let Some(degree) = in_degree.get_mut(neighbor) {
443                        *degree = degree.saturating_sub(1);
444                        if *degree == 0 && !executed.contains(neighbor) {
445                            queue.push_back(neighbor.clone());
446                        }
447                    }
448                }
449            }
450        }
451
452        self.metrics.total_duration = execution_start.elapsed();
453        
454        info!(
455            "Executor: Completed execution in {:?}. Executed: {}, Skipped: {}, Failed: {}, Cache hits: {}",
456            self.metrics.total_duration,
457            self.metrics.nodes_executed,
458            self.metrics.nodes_skipped,
459            self.metrics.nodes_failed,
460            self.metrics.cache_hits
461        );
462
463        // Verify all nodes were executed (should not happen with cycle detection)
464        let unexecuted: Vec<_> = graph
465            .def
466            .nodes
467            .keys()
468            .filter(|id| !executed.contains(*id))
469            .collect();
470
471        if !unexecuted.is_empty() {
472            return Err(anyhow::anyhow!(
473                "Some nodes were not executed: {:?}. This indicates a bug in the executor logic.",
474                unexecuted
475            ));
476        }
477
478        Ok(())
479    }
480}
481
482impl Default for Executor {
483    fn default() -> Self {
484        Self::new()
485    }
486}