rust_logic_graph/core/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use anyhow::Result;
3use tracing::{info, debug, warn};
4
5use crate::core::{Graph, GraphDef};
6use crate::node::{Node, RuleNode as ConcreteRuleNode, DBNode, AINode};
7use crate::rule::Rule;
8
9pub struct Executor {
10    nodes: HashMap<String, Box<dyn Node>>,
11}
12
13impl Executor {
14    pub fn new() -> Self {
15        Self {
16            nodes: HashMap::new(),
17        }
18    }
19
20    /// Build executor from graph definition
21    pub fn from_graph_def(def: &GraphDef) -> Result<Self> {
22        let mut executor = Self::new();
23
24        // Create concrete node instances based on NodeType
25        for (node_id, node_type) in &def.nodes {
26            let node: Box<dyn Node> = match node_type {
27                crate::node::NodeType::RuleNode => {
28                    Box::new(ConcreteRuleNode::new(node_id, "true"))
29                }
30                crate::node::NodeType::DBNode => {
31                    Box::new(DBNode::new(node_id, format!("SELECT * FROM {}", node_id)))
32                }
33                crate::node::NodeType::AINode => {
34                    Box::new(AINode::new(node_id, format!("Process data for {}", node_id)))
35                }
36            };
37
38            executor.register_node(node);
39        }
40
41        Ok(executor)
42    }
43
44    /// Register a node with the executor
45    pub fn register_node(&mut self, node: Box<dyn Node>) {
46        let id = node.id().to_string();
47        self.nodes.insert(id, node);
48    }
49
50    /// Execute the graph in topological order
51    pub async fn execute(&self, graph: &mut Graph) -> Result<()> {
52        info!("Executor: Starting graph execution");
53
54        // Build adjacency list and in-degree map
55        let mut adj_list: HashMap<String, Vec<String>> = HashMap::new();
56        let mut in_degree: HashMap<String, usize> = HashMap::new();
57
58        // Initialize all nodes with 0 in-degree
59        for node_id in graph.def.nodes.keys() {
60            in_degree.insert(node_id.clone(), 0);
61            adj_list.insert(node_id.clone(), Vec::new());
62        }
63
64        // Build the graph structure
65        for edge in &graph.def.edges {
66            adj_list
67                .entry(edge.from.clone())
68                .or_insert_with(Vec::new)
69                .push(edge.to.clone());
70
71            *in_degree.entry(edge.to.clone()).or_insert(0) += 1;
72        }
73
74        // Find all nodes with in-degree 0 (starting nodes)
75        let mut queue: VecDeque<String> = in_degree
76            .iter()
77            .filter(|(_, &degree)| degree == 0)
78            .map(|(id, _)| id.clone())
79            .collect();
80
81        if queue.is_empty() {
82            warn!("No starting nodes found (all nodes have incoming edges). Starting with first node.");
83            if let Some(first_node) = graph.def.nodes.keys().next() {
84                queue.push_back(first_node.clone());
85            }
86        }
87
88        let mut executed = HashSet::new();
89        let mut execution_order = Vec::new();
90
91        // Topological sort & execution
92        while let Some(node_id) = queue.pop_front() {
93            if executed.contains(&node_id) {
94                continue;
95            }
96
97            info!("Executor: Processing node '{}'", node_id);
98
99            // Check if all incoming edges have their rules satisfied
100            let incoming_edges: Vec<_> = graph
101                .def
102                .edges
103                .iter()
104                .filter(|e| e.to == node_id)
105                .collect();
106
107            let mut should_execute = true;
108
109            for edge in &incoming_edges {
110                if let Some(rule_id) = &edge.rule {
111                    let rule = Rule::new(rule_id, "true"); // Default condition
112
113                    match rule.evaluate(&graph.context.data) {
114                        Ok(result) => {
115                            debug!(
116                                "Rule '{}' for edge {} -> {} evaluated to: {:?}",
117                                rule_id, edge.from, edge.to, result
118                            );
119
120                            if let serde_json::Value::Bool(false) = result {
121                                should_execute = false;
122                                info!(
123                                    "Skipping node '{}' due to failed rule '{}'",
124                                    node_id, rule_id
125                                );
126                                break;
127                            }
128                        }
129                        Err(e) => {
130                            warn!(
131                                "Rule '{}' evaluation failed: {}. Assuming true.",
132                                rule_id, e
133                            );
134                        }
135                    }
136                }
137            }
138
139            // Execute the node
140            if should_execute {
141                if let Some(node) = self.nodes.get(&node_id) {
142                    match node.run(&mut graph.context).await {
143                        Ok(_) => {
144                            info!("Node '{}' executed successfully", node_id);
145                            execution_order.push(node_id.clone());
146                        }
147                        Err(e) => {
148                            warn!("Node '{}' execution failed: {:?}", node_id, e);
149                        }
150                    }
151                } else {
152                    warn!("Node '{}' not found in executor", node_id);
153                }
154            }
155
156            executed.insert(node_id.clone());
157
158            // Add downstream nodes to queue
159            if let Some(neighbors) = adj_list.get(&node_id) {
160                for neighbor in neighbors {
161                    if let Some(degree) = in_degree.get_mut(neighbor) {
162                        *degree = degree.saturating_sub(1);
163                        if *degree == 0 && !executed.contains(neighbor) {
164                            queue.push_back(neighbor.clone());
165                        }
166                    }
167                }
168            }
169        }
170
171        info!(
172            "Executor: Completed execution. Executed nodes: {:?}",
173            execution_order
174        );
175
176        // Check for unexecuted nodes (possible cycle)
177        let unexecuted: Vec<_> = graph
178            .def
179            .nodes
180            .keys()
181            .filter(|id| !executed.contains(*id))
182            .collect();
183
184        if !unexecuted.is_empty() {
185            warn!("Some nodes were not executed (possible cycle): {:?}", unexecuted);
186        }
187
188        Ok(())
189    }
190}
191
192impl Default for Executor {
193    fn default() -> Self {
194        Self::new()
195    }
196}