rust_logic_graph/core/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4
5use crate::core::{Graph, GraphDef};
6use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
7use crate::rule::Rule;
8use crate::cache::{CacheManager, CacheKey};
9
10pub struct Executor {
11    nodes: HashMap<String, Box<dyn Node>>,
12    cache: Option<CacheManager>,
13}
14
15impl Executor {
16    pub fn new() -> Self {
17        Self {
18            nodes: HashMap::new(),
19            cache: None,
20        }
21    }
22
23    /// Create a new executor with caching enabled
24    pub fn with_cache(cache: CacheManager) -> Self {
25        Self {
26            nodes: HashMap::new(),
27            cache: Some(cache),
28        }
29    }
30
31    /// Enable caching for this executor
32    pub fn set_cache(&mut self, cache: CacheManager) {
33        self.cache = Some(cache);
34    }
35
36    /// Get the cache manager (if enabled)
37    pub fn cache(&self) -> Option<&CacheManager> {
38        self.cache.as_ref()
39    }
40
41    /// Build executor from graph definition
42    pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
43        let mut executor = Self::new();
44
45        // Create concrete node instances based on NodeType
46        for (node_id, node_type) in &def.nodes {
47            let node: Box<dyn Node> = match node_type {
48                crate::node::NodeType::RuleNode => {
49                    Box::new(ConcreteRuleNode::new(node_id, "true"))
50                }
51                crate::node::NodeType::DBNode => {
52                    Box::new(DBNode::new(node_id, format!("SELECT * FROM {}", node_id)))
53                }
54                crate::node::NodeType::AINode => {
55                    Box::new(AINode::new(node_id, format!("Process data for {}", node_id)))
56                }
57            };
58
59            executor.register_node(node);
60        }
61
62        Ok(executor)
63    }
64
65    /// Register a node with the executor
66    pub fn register_node(&mut self, node: Box<dyn Node>) {
67        let id = node.id().to_string();
68        self.nodes.insert(id, node);
69    }
70
71    /// Execute the graph in topological order
72    pub async fn execute(&self, graph: &mut Graph) -> Result<()> {
73        info!("Executor: Starting graph execution");
74
75        // Build adjacency list and in-degree map
76        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
77        let mut in_degree: HashMap<String, usize> = HashMap::new();
78
79        // Initialize all nodes with 0 in-degree
80        for node_id in graph.def.nodes.keys() {
81            in_degree.insert(node_id.clone(), 0);
82            adj_list.insert(node_id.clone(), Vec::new());
83        }
84
85        // Build the graph structure
86        for edge in &graph.def.edges {
87            adj_list
88                .entry(edge.from.clone())
89                .or_insert_with(Vec::new)
90                .push(edge.to.clone());
91
92            *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
93        }
94
95        // Find all nodes with in-degree 0 (starting nodes)
96        let mut queue: VecDeque<String> = in_degree
97            .iter()
98            .filter(|(_, &degree)| degree == 0)
99            .map(|(id, _)| id.clone())
100            .collect();
101
102        if queue.is_empty() {
103            warn!("No starting nodes found (all nodes have incoming edges). Starting with first node.");
104            if let Some(first_node) = graph.def.nodes.keys().next() {
105                queue.push_back(first_node.clone());
106            }
107        }
108
109        let mut executed = HashSet::new();
110        let mut execution_order = Vec::new();
111
112        // Topological sort & execution
113        while let Some(node_id) = queue.pop_front() {
114            if executed.contains(&node_id) {
115                continue;
116            }
117
118            info!("Executor: Processing node '{}'", node_id);
119
120            // Check if all incoming edges have their rules satisfied
121            let incoming_edges: Vec<_> = graph
122                .def
123                .edges
124                .iter()
125                .filter(|e| e.to == node_id)
126                .collect();
127
128            let mut should_execute = true;
129
130            for edge in &incoming_edges {
131                if let Some(rule_id) = &edge.rule {
132                    let rule = Rule::new(rule_id, "true"); // Default condition
133
134                    match rule.evaluate(&graph.context.data) {
135                        Ok(result) => {
136                            debug!(
137                                "Rule '{}' for edge {} -> {} evaluated to: {:?}",
138                                rule_id, edge.from, edge.to, result
139                            );
140
141                            if let serde_json::Value::Bool(false) = result {
142                                should_execute = false;
143                                info!(
144                                    "Skipping node '{}' due to failed rule '{}'",
145                                    node_id, rule_id
146                                );
147                                break;
148                            }
149                        }
150                        Err(e) => {
151                            warn!(
152                                "Rule '{}' evaluation failed: {}. Assuming true.",
153                                rule_id, e
154                            );
155                        }
156                    }
157                }
158            }
159
160            // Execute the node
161            if should_execute {
162                if let Some(node) = self.nodes.get(&node_id) {
163                    // Create cache key based on node ID and current context
164                    let context_value = serde_json::to_value(&graph.context.data)?;
165                    let cache_key = CacheKey::new(&node_id, &context_value);
166
167                    // Check cache first
168                    let mut cached_hit = false;
169                    
170                    if let Some(cache) = &self.cache {
171                        if let Some(cached_value) = cache.get(&cache_key) {
172                            info!("Node '{}' result retrieved from cache", node_id);
173                            
174                            // Merge cached result into context
175                            if let serde_json::Value::Object(cached_obj) = cached_value {
176                                for (k, v) in cached_obj {
177                                    graph.context.data.insert(k, v);
178                                }
179                            }
180                            
181                            cached_hit = true;
182                        }
183                    }
184
185                    // Execute node if not cached
186                    let result = if !cached_hit {
187                        node.run(&mut graph.context).await
188                    } else {
189                        Ok(serde_json::Value::Null) // Dummy value for cached result
190                    };
191
192                    match result {
193                        Ok(_) => {
194                            info!("Node '{}' executed successfully", node_id);
195                            execution_order.push(node_id.clone());
196
197                            // Store result in cache if caching is enabled and node was actually executed
198                            if !cached_hit {
199                                if let Some(cache) = &self.cache {
200                                    let context_result = serde_json::to_value(&graph.context.data)?;
201                                    if let Err(e) = cache.put(cache_key, context_result, None) {
202                                        warn!("Failed to cache result for node '{}': {}", node_id, e);
203                                    }
204                                }
205                            }
206                        }
207                        Err(e) => {
208                            warn!("Node '{}' execution failed: {:?}", node_id, e);
209                        }
210                    }
211                } else {
212                    warn!("Node '{}' not found in executor", node_id);
213                }
214            }
215
216            executed.insert(node_id.clone());
217
218            // Add downstream nodes to queue
219            if let Some(neighbors) = adj_list.get(&node_id) {
220                for neighbor in neighbors {
221                    if let Some(degree) = in_degree.get_mut(neighbor) {
222                        *degree = degree.saturating_sub(1);
223                        if *degree == 0 && !executed.contains(neighbor) {
224                            queue.push_back(neighbor.clone());
225                        }
226                    }
227                }
228            }
229        }
230
231        info!(
232            "Executor: Completed execution. Executed nodes: {:?}",
233            execution_order
234        );
235
236        // Check for unexecuted nodes (possible cycle)
237        let unexecuted: Vec<_> = graph
238            .def
239            .nodes
240            .keys()
241            .filter(|id| !executed.contains(*id))
242            .collect();
243
244        if !unexecuted.is_empty() {
245            warn!("Some nodes were not executed (possible cycle): {:?}", unexecuted);
246        }
247
248        Ok(())
249    }
250}
251
252impl Default for Executor {
253    fn default() -> Self {
254        Self::new()
255    }
256}