rust_logic_graph/core/
executor.rs

1use anyhow::Result;
2use std::collections::{HashMap, HashSet, VecDeque};
3use std::time::{Duration, Instant};
4use tracing::{debug, info, warn};
5
6use crate::cache::{CacheKey, CacheManager};
7use crate::core::{Graph, GraphDef};
8use crate::node::{AINode, DBNode, Node, RuleNode as ConcreteRuleNode};
9use crate::rule::Rule;
10
11/// Execution statistics for a single node
12#[derive(Debug, Clone)]
13pub struct NodeExecutionStats {
14    pub node_id: String,
15    pub duration: Duration,
16    pub cache_hit: bool,
17    pub success: bool,
18}
19
20/// Overall execution metrics
21#[derive(Debug, Clone, Default)]
22pub struct ExecutionMetrics {
23    pub total_duration: Duration,
24    pub nodes_executed: usize,
25    pub nodes_skipped: usize,
26    pub nodes_failed: usize,
27    pub cache_hits: usize,
28    pub node_stats: Vec<NodeExecutionStats>,
29}
30
31/// Executor for running graph nodes in topological order.
32///
33/// # Thread Safety
34///
35/// The Executor is **NOT thread-safe** for concurrent executions on the same instance.
36/// While `execute()` is async and takes `&self` (shared reference), the underlying
37/// implementation assumes single-threaded access patterns:
38///
39/// - `self.nodes` is a regular `HashMap` without synchronization
40/// - Multiple concurrent calls to `execute()` would have data races when accessing nodes
41///
42/// ## Safe Usage Patterns
43///
44/// 1. **Single execution at a time**: Only call `execute()` once at a time per executor instance
45/// 2. **Clone for parallelism**: Create separate executor instances for parallel graph executions
46/// 3. **Sequential async**: Use `.await` to ensure executions don't overlap
47///
48/// ## Future Work
49///
50/// For true concurrent execution support, wrap `nodes` in `Arc<RwLock<HashMap>>` or similar.
51pub struct Executor {
52    nodes: HashMap<String, Box<dyn Node>>,
53    cache: Option<CacheManager>,
54    metrics: ExecutionMetrics,
55    fallback_handler: Option<crate::fault_tolerance::degradation::FallbackHandler>,
56}
57
58impl Executor {
59    pub fn new() -> Self {
60        Self {
61            nodes: HashMap::new(),
62            cache: None,
63            metrics: ExecutionMetrics::default(),
64            fallback_handler: None,
65        }
66    }
67
68    /// Create a new executor with caching enabled
69    pub fn with_cache(cache: CacheManager) -> Self {
70        Self {
71            nodes: HashMap::new(),
72            cache: Some(cache),
73            metrics: ExecutionMetrics::default(),
74            fallback_handler: None,
75        }
76    }
77
78    /// Enable caching for this executor
79    pub fn set_cache(&mut self, cache: CacheManager) {
80        self.cache = Some(cache);
81    }
82
83    /// Set a global fallback handler used when node execution fails
84    pub fn set_fallback_handler(
85        &mut self,
86        handler: crate::fault_tolerance::degradation::FallbackHandler,
87    ) {
88        self.fallback_handler = Some(handler);
89    }
90
91    /// Get the cache manager (if enabled)
92    pub fn cache(&self) -> Option<&CacheManager> {
93        self.cache.as_ref()
94    }
95
96    /// Get execution metrics from last run
97    pub fn metrics(&self) -> &ExecutionMetrics {
98        &self.metrics
99    }
100
101    /// Reset execution metrics
102    pub fn reset_metrics(&mut self) {
103        self.metrics = ExecutionMetrics::default();
104    }
105
106    /// Build executor from graph definition
107    pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
108        let mut executor = Self::new();
109
110        // Create concrete node instances based on NodeConfig
111        for (node_id, config) in &def.nodes {
112            let node: Box<dyn Node> = match config.node_type {
113                crate::node::NodeType::RuleNode => {
114                    let condition = config.condition.as_deref().unwrap_or("true");
115                    Box::new(ConcreteRuleNode::new(node_id, condition))
116                }
117                crate::node::NodeType::DBNode => {
118                    let query = config
119                        .query
120                        .clone()
121                        .unwrap_or_else(|| format!("SELECT * FROM {}", node_id));
122
123                    // Create DBNode with params if specified
124                    if let Some(params) = config.params.clone() {
125                        Box::new(DBNode::with_params(node_id, query, params))
126                    } else {
127                        Box::new(DBNode::new(node_id, query))
128                    }
129                }
130                crate::node::NodeType::AINode => {
131                    let prompt = config
132                        .prompt
133                        .clone()
134                        .unwrap_or_else(|| format!("Process data for {}", node_id));
135                    Box::new(AINode::new(node_id, prompt))
136                }
137                crate::node::NodeType::GrpcNode => {
138                    // Parse query field as "service_url#method"
139                    let query = config
140                        .query
141                        .clone()
142                        .unwrap_or_else(|| format!("http://localhost:50051#{}_method", node_id));
143                    let parts: Vec<&str> = query.split('#').collect();
144                    let service_url = parts
145                        .get(0)
146                        .unwrap_or(&"http://localhost:50051")
147                        .to_string();
148                    let method = parts.get(1).unwrap_or(&"UnknownMethod").to_string();
149                    Box::new(crate::node::GrpcNode::new(node_id, service_url, method))
150                }
151                // Advanced nodes - not yet fully supported in from_graph_def
152                // Use Node trait directly if needed
153                crate::node::NodeType::SubgraphNode
154                | crate::node::NodeType::ConditionalNode
155                | crate::node::NodeType::LoopNode
156                | crate::node::NodeType::TryCatchNode
157                | crate::node::NodeType::RetryNode
158                | crate::node::NodeType::CircuitBreakerNode => {
159                    // Placeholder: create a simple rule node
160                    // TODO: Implement proper constructors for advanced nodes
161                    Box::new(ConcreteRuleNode::new(node_id, "true"))
162                }
163            };
164
165            executor.register_node(node);
166        }
167
168        Ok(executor)
169    }
170
171    /// Register a node with the executor
172    pub fn register_node(&mut self, node: Box<dyn Node>) {
173        let id = node.id().to_string();
174        self.nodes.insert(id, node);
175    }
176
177    /// Detect cycles in the graph using DFS
178    fn detect_cycles(&self, graph: &Graph) -> Result<()> {
179        let mut visited = HashSet::new();
180        let mut rec_stack = HashSet::new();
181
182        // Build adjacency list for cycle detection
183        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
184        for edge in &graph.def.edges {
185            adj_list
186                .entry(edge.from.clone())
187                .or_insert_with(Vec::new)
188                .push(edge.to.clone());
189        }
190
191        // DFS to detect cycles
192        fn dfs_cycle_check(
193            node: &str,
194            adj_list: &HashMap<String, Vec<String>>,
195            visited: &mut HashSet<String>,
196            rec_stack: &mut HashSet<String>,
197            path: &mut Vec<String>,
198        ) -> Option<Vec<String>> {
199            visited.insert(node.to_string());
200            rec_stack.insert(node.to_string());
201            path.push(node.to_string());
202
203            if let Some(neighbors) = adj_list.get(node) {
204                for neighbor in neighbors {
205                    if !visited.contains(neighbor) {
206                        if let Some(cycle) =
207                            dfs_cycle_check(neighbor, adj_list, visited, rec_stack, path)
208                        {
209                            return Some(cycle);
210                        }
211                    } else if rec_stack.contains(neighbor) {
212                        // Found a cycle - return the cycle path
213                        let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
214                        return Some(path[cycle_start..].to_vec());
215                    }
216                }
217            }
218
219            path.pop();
220            rec_stack.remove(node);
221            None
222        }
223
224        // Check all nodes
225        for node_id in graph.def.nodes.keys() {
226            if !visited.contains(node_id) {
227                let mut path = Vec::new();
228                if let Some(cycle) =
229                    dfs_cycle_check(node_id, &adj_list, &mut visited, &mut rec_stack, &mut path)
230                {
231                    return Err(anyhow::anyhow!(
232                        "Cycle detected in graph: {} -> {}",
233                        cycle.join(" -> "),
234                        cycle.first().unwrap()
235                    ));
236                }
237            }
238        }
239
240        Ok(())
241    }
242
243    /// Execute the graph in topological order
244    pub async fn execute(&mut self, graph: &mut Graph) -> Result<()> {
245        eprintln!("\n⚡⚡⚡ CORE EXECUTOR.EXECUTE called ⚡⚡⚡");
246        eprintln!(
247            "Graph has {} nodes, {} edges",
248            graph.def.nodes.len(),
249            graph.def.edges.len()
250        );
251        info!("Executor: Starting graph execution");
252        let execution_start = Instant::now();
253
254        // Reset metrics
255        self.metrics = ExecutionMetrics::default();
256
257        // Validate graph structure first
258        graph.def.validate()?;
259
260        // Warn about disconnected components
261        if graph.def.has_disconnected_components() {
262            warn!("Graph has disconnected components - some nodes may not be reachable");
263        }
264
265        // First, detect cycles in the graph
266        self.detect_cycles(graph)?;
267
268        // Build adjacency list and in-degree map
269        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
270        let mut in_degree: HashMap<String, usize> = HashMap::new();
271
272        // Initialize all nodes with 0 in-degree
273        for node_id in graph.def.nodes.keys() {
274            in_degree.insert(node_id.clone(), 0);
275            adj_list.insert(node_id.clone(), Vec::new());
276        }
277
278        // Build the graph structure
279        for edge in &graph.def.edges {
280            adj_list
281                .entry(edge.from.clone())
282                .or_insert_with(Vec::new)
283                .push(edge.to.clone());
284
285            *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
286        }
287
288        // Find all nodes with in-degree 0 (starting nodes)
289        let mut queue: VecDeque<String> = in_degree
290            .iter()
291            .filter(|(_, &degree)| degree == 0)
292            .map(|(id, _)| id.clone())
293            .collect();
294
295        if queue.is_empty() {
296            return Err(anyhow::anyhow!(
297                "No starting nodes found in graph (all nodes have incoming edges). \
298                This indicates either a cycle (which should have been caught earlier) \
299                or an invalid graph structure. Nodes: {:?}",
300                graph.def.nodes.keys().collect::<Vec<_>>()
301            ));
302        }
303
304        let mut executed = HashSet::new();
305        let mut execution_order = Vec::new();
306
307        // Topological sort & execution
308        while let Some(node_id) = queue.pop_front() {
309            if executed.contains(&node_id) {
310                continue;
311            }
312
313            info!("Executor: Processing node '{}'", node_id);
314
315            // Check if all incoming edges have their rules satisfied
316            // IMPORTANT: Only check edges from nodes that have been executed
317            let incoming_edges: Vec<_> = graph
318                .def
319                .edges
320                .iter()
321                .filter(|e| e.to == node_id && executed.contains(&e.from))
322                .collect();
323
324            let mut should_execute = true;
325
326            for edge in &incoming_edges {
327                if let Some(rule_id) = &edge.rule {
328                    let rule = Rule::new(rule_id, "true"); // Default condition
329
330                    match rule.evaluate(&graph.context.data) {
331                        Ok(result) => {
332                            debug!(
333                                "Rule '{}' for edge {} -> {} evaluated to: {:?}",
334                                rule_id, edge.from, edge.to, result
335                            );
336
337                            if let serde_json::Value::Bool(false) = result {
338                                should_execute = false;
339                                info!(
340                                    "Skipping node '{}' due to failed rule '{}' from executed node '{}'",
341                                    node_id, rule_id, edge.from
342                                );
343                                self.metrics.nodes_skipped += 1;
344                                break;
345                            }
346                        }
347                        Err(e) => {
348                            warn!(
349                                "Rule '{}' evaluation failed: {}. Assuming true.",
350                                rule_id, e
351                            );
352                        }
353                    }
354                }
355            }
356
357            // Execute the node
358            if should_execute {
359                eprintln!("🔵 About to execute node: {}", node_id);
360                if let Some(node) = self.nodes.get(&node_id) {
361                    let node_start = Instant::now();
362                    let mut cache_hit = false;
363
364                    // Create cache key based on node ID and relevant context
365                    // Include both upstream node results AND initial parameters
366                    let mut relevant_context: HashMap<String, serde_json::Value> = incoming_edges
367                        .iter()
368                        .filter_map(|edge| {
369                            graph
370                                .context
371                                .data
372                                .get(&format!("{}_result", edge.from))
373                                .map(|v| (edge.from.clone(), v.clone()))
374                        })
375                        .collect();
376
377                    // For nodes with no incoming edges (root nodes), include initial params in cache key
378                    // This ensures different initial parameters (e.g., product_id) create different cache entries
379                    if incoming_edges.is_empty() {
380                        // Include all non-result context keys as initial params
381                        for (key, value) in &graph.context.data {
382                            if !key.ends_with("_result") {
383                                relevant_context.insert(format!("_initial_{}", key), value.clone());
384                            }
385                        }
386                    }
387
388                    let context_value = serde_json::to_value(&relevant_context)?;
389                    let cache_key = CacheKey::new(&node_id, &context_value);
390
391                    // Check cache first
392                    let cached_result = if let Some(cache) = &self.cache {
393                        cache.get(&cache_key)
394                    } else {
395                        None
396                    };
397
398                    let result = if let Some(cached_value) = cached_result {
399                        info!("Node '{}' result retrieved from cache", node_id);
400                        cache_hit = true;
401                        self.metrics.cache_hits += 1;
402
403                        // Merge cached result into context
404                        if let serde_json::Value::Object(cached_obj) = cached_value {
405                            for (k, v) in cached_obj {
406                                graph.context.data.insert(k, v);
407                            }
408                        }
409
410                        Ok(serde_json::Value::Null) // Successfully used cache
411                    } else {
412                        // Execute node and cache result
413                        let exec_result = node.run(&mut graph.context).await;
414
415                        // On failure, attempt graceful degradation via fallback handler
416                        if exec_result.is_err() {
417                            let fallback = crate::fault_tolerance::degradation::degrade_on_failure(
418                                &node_id,
419                                &mut graph.context,
420                                self.fallback_handler,
421                            );
422                            if fallback.is_some() {
423                                info!("Applied fallback for node '{}'", node_id);
424                            }
425                        }
426
427                        // Store result in cache if execution succeeded (or fallback set _result)
428                        if let Some(cache) = &self.cache {
429                            let mut result_only: HashMap<String, serde_json::Value> =
430                                HashMap::new();
431                            for (key, value) in &graph.context.data {
432                                if key.ends_with("_result") {
433                                    result_only.insert(key.clone(), value.clone());
434                                }
435                            }
436                            let context_result = serde_json::to_value(&result_only)?;
437                            if let Err(e) = cache.put(cache_key, context_result, None) {
438                                warn!("Failed to cache result for node '{}': {}", node_id, e);
439                            }
440                        }
441
442                        exec_result
443                    };
444
445                    match result {
446                        Ok(_) => {
447                            let duration = node_start.elapsed();
448                            info!("Node '{}' executed successfully in {:?}", node_id, duration);
449                            execution_order.push(node_id.clone());
450
451                            self.metrics.nodes_executed += 1;
452                            self.metrics.node_stats.push(NodeExecutionStats {
453                                node_id: node_id.clone(),
454                                duration,
455                                cache_hit,
456                                success: true,
457                            });
458                        }
459                        Err(e) => {
460                            let duration = node_start.elapsed();
461                            warn!("Node '{}' execution failed: {:?}", node_id, e);
462
463                            self.metrics.nodes_failed += 1;
464                            self.metrics.node_stats.push(NodeExecutionStats {
465                                node_id: node_id.clone(),
466                                duration,
467                                cache_hit,
468                                success: false,
469                            });
470                        }
471                    }
472                } else {
473                    warn!("Node '{}' not found in executor", node_id);
474                }
475            }
476
477            executed.insert(node_id.clone());
478
479            // Add downstream nodes to queue
480            if let Some(neighbors) = adj_list.get(&node_id) {
481                for neighbor in neighbors {
482                    if let Some(degree) = in_degree.get_mut(neighbor) {
483                        *degree = degree.saturating_sub(1);
484                        if *degree == 0 && !executed.contains(neighbor) {
485                            queue.push_back(neighbor.clone());
486                        }
487                    }
488                }
489            }
490        }
491
492        self.metrics.total_duration = execution_start.elapsed();
493
494        info!(
495            "Executor: Completed execution in {:?}. Executed: {}, Skipped: {}, Failed: {}, Cache hits: {}",
496            self.metrics.total_duration,
497            self.metrics.nodes_executed,
498            self.metrics.nodes_skipped,
499            self.metrics.nodes_failed,
500            self.metrics.cache_hits
501        );
502
503        // Verify all nodes were executed (should not happen with cycle detection)
504        let unexecuted: Vec<_> = graph
505            .def
506            .nodes
507            .keys()
508            .filter(|id| !executed.contains(*id))
509            .collect();
510
511        if !unexecuted.is_empty() {
512            return Err(anyhow::anyhow!(
513                "Some nodes were not executed: {:?}. This indicates a bug in the executor logic.",
514                unexecuted
515            ));
516        }
517
518        Ok(())
519    }
520}
521
522impl Default for Executor {
523    fn default() -> Self {
524        Self::new()
525    }
526}