rrag_graph/
execution.rs

1//! # Simple Execution Engine
2//!
3//! A simplified execution engine that avoids complex lifetime issues.
4
5use crate::core::{ExecutionContext, ExecutionResult, NodeId, WorkflowGraph};
6use crate::state::GraphState;
7use crate::{RGraphError, RGraphResult};
8use std::time::{Duration, Instant};
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13/// Configuration for the execution engine
14#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16pub struct ExecutionConfig {
17    /// Maximum number of nodes to execute
18    pub max_nodes: usize,
19    /// Continue execution on node errors
20    pub continue_on_error: bool,
21    /// Enable verbose logging
22    pub verbose_logging: bool,
23    /// Execution timeout in seconds
24    pub timeout_seconds: Option<u64>,
25    /// Maximum execution depth to prevent infinite loops
26    pub max_execution_depth: usize,
27}
28
29impl Default for ExecutionConfig {
30    fn default() -> Self {
31        Self {
32            max_nodes: 1000,
33            continue_on_error: false,
34            verbose_logging: false,
35            timeout_seconds: Some(300), // 5 minutes
36            max_execution_depth: 100,
37        }
38    }
39}
40
41/// Execution mode for the graph
42#[derive(Debug, Clone, PartialEq)]
43#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
44pub enum ExecutionMode {
45    /// Execute nodes sequentially
46    Sequential,
47    /// Execute nodes in parallel where possible
48    Parallel,
49}
50
51impl Default for ExecutionMode {
52    fn default() -> Self {
53        ExecutionMode::Sequential
54    }
55}
56
57/// Results from graph execution
58#[derive(Debug, Clone)]
59pub struct ExecutionResults {
60    /// The final state after execution
61    pub final_state: GraphState,
62    /// Execution metrics
63    pub metrics: ExecutionMetrics,
64    /// Any errors that occurred
65    pub errors: Vec<ExecutionError>,
66}
67
68/// Metrics collected during execution
69#[derive(Debug, Clone, Default)]
70pub struct ExecutionMetrics {
71    /// Number of nodes executed
72    pub nodes_executed: usize,
73    /// Total execution duration
74    pub total_duration: Duration,
75    /// Success indicator
76    pub success: bool,
77}
78
79/// Error that occurred during execution
80#[derive(Debug, Clone)]
81pub struct ExecutionError {
82    /// ID of the node that failed
83    pub node_id: String,
84    /// Error message
85    pub error_message: String,
86    /// When the error occurred
87    pub timestamp: chrono::DateTime<chrono::Utc>,
88    /// Type of error
89    pub error_type: String,
90}
91
92/// Simple execution engine
93#[derive(Debug, Clone)]
94pub struct ExecutionEngine {
95    config: ExecutionConfig,
96}
97
98impl ExecutionEngine {
99    /// Create a new execution engine with default configuration
100    pub fn new() -> Self {
101        Self {
102            config: ExecutionConfig::default(),
103        }
104    }
105
106    /// Create a new execution engine with custom configuration
107    pub fn with_config(config: ExecutionConfig) -> Self {
108        Self { config }
109    }
110
111    /// Execute a workflow graph
112    pub async fn execute(
113        &self,
114        graph: &WorkflowGraph,
115        mut state: GraphState,
116    ) -> RGraphResult<ExecutionResults> {
117        let start_time = Instant::now();
118        let mut errors = Vec::new();
119        let mut nodes_executed = 0;
120
121        if self.config.verbose_logging {
122            #[cfg(feature = "observability")]
123            tracing::info!("Starting graph execution: {}", graph.id());
124            #[cfg(not(feature = "observability"))]
125            eprintln!("Starting graph execution: {}", graph.id());
126        }
127
128        // Get entry points
129        let entry_points = graph.entry_points_owned();
130        if entry_points.is_empty() {
131            return Err(RGraphError::execution("No entry points defined for graph"));
132        }
133
134        // Execute each entry point
135        for entry_node_id in &entry_points {
136            match self
137                .execute_single_node(graph, &mut state, entry_node_id)
138                .await
139            {
140                Ok(_) => {
141                    nodes_executed += 1;
142                }
143                Err(e) => {
144                    let error = ExecutionError {
145                        node_id: entry_node_id.as_str().to_string(),
146                        error_message: e.to_string(),
147                        timestamp: chrono::Utc::now(),
148                        error_type: "NodeExecutionError".to_string(),
149                    };
150                    errors.push(error);
151
152                    if !self.config.continue_on_error {
153                        break;
154                    }
155                }
156            }
157
158            if nodes_executed >= self.config.max_nodes {
159                break;
160            }
161        }
162
163        let total_duration = start_time.elapsed();
164        let success = errors.is_empty() || self.config.continue_on_error;
165
166        if self.config.verbose_logging {
167            #[cfg(feature = "observability")]
168            tracing::info!(
169                "Graph execution completed: {} (success: {}, duration: {:?})",
170                graph.id(),
171                success,
172                total_duration
173            );
174            #[cfg(not(feature = "observability"))]
175            eprintln!(
176                "Graph execution completed: {} (success: {}, duration: {:?})",
177                graph.id(),
178                success,
179                total_duration
180            );
181        }
182
183        Ok(ExecutionResults {
184            final_state: state,
185            metrics: ExecutionMetrics {
186                nodes_executed,
187                total_duration,
188                success,
189            },
190            errors,
191        })
192    }
193
194    /// Execute a single node
195    async fn execute_single_node(
196        &self,
197        graph: &WorkflowGraph,
198        state: &mut GraphState,
199        node_id: &NodeId,
200    ) -> RGraphResult<()> {
201        // Get the node
202        let node = graph.get_node(node_id).ok_or_else(|| {
203            RGraphError::execution(format!("Node '{}' not found", node_id.as_str()))
204        })?;
205
206        // Create execution context
207        let context = ExecutionContext::new(graph.id().to_string(), node_id.clone());
208
209        if self.config.verbose_logging {
210            #[cfg(feature = "observability")]
211            tracing::debug!("Executing node: {}", node_id.as_str());
212            #[cfg(not(feature = "observability"))]
213            eprintln!("Executing node: {}", node_id.as_str());
214        }
215
216        // Execute the node
217        match node.execute(state, &context).await {
218            Ok(ExecutionResult::Continue) => {
219                if self.config.verbose_logging {
220                    #[cfg(feature = "observability")]
221                    tracing::debug!("Node '{}' completed successfully", node_id.as_str());
222                    #[cfg(not(feature = "observability"))]
223                    eprintln!("Node '{}' completed successfully", node_id.as_str());
224                }
225                Ok(())
226            }
227            Ok(ExecutionResult::Stop) => {
228                if self.config.verbose_logging {
229                    #[cfg(feature = "observability")]
230                    tracing::info!("Node '{}' requested execution stop", node_id.as_str());
231                    #[cfg(not(feature = "observability"))]
232                    eprintln!("Node '{}' requested execution stop", node_id.as_str());
233                }
234                Ok(())
235            }
236            Ok(ExecutionResult::Route(_next_node)) => {
237                // For now, we'll treat routing as completion
238                // In a more complex implementation, we'd follow the route
239                if self.config.verbose_logging {
240                    #[cfg(feature = "observability")]
241                    tracing::debug!("Node '{}' requested routing", node_id.as_str());
242                    #[cfg(not(feature = "observability"))]
243                    eprintln!("Node '{}' requested routing", node_id.as_str());
244                }
245                Ok(())
246            }
247            Ok(ExecutionResult::JumpTo(_target_node)) => {
248                // For now, we'll treat jump as completion
249                // In a more complex implementation, we'd jump to the target
250                if self.config.verbose_logging {
251                    #[cfg(feature = "observability")]
252                    tracing::debug!("Node '{}' requested jump", node_id.as_str());
253                    #[cfg(not(feature = "observability"))]
254                    eprintln!("Node '{}' requested jump", node_id.as_str());
255                }
256                Ok(())
257            }
258            Err(e) => {
259                if self.config.verbose_logging {
260                    #[cfg(feature = "observability")]
261                    tracing::error!("Node '{}' failed: {}", node_id.as_str(), e);
262                    #[cfg(not(feature = "observability"))]
263                    eprintln!("Node '{}' failed: {}", node_id.as_str(), e);
264                }
265                Err(e)
266            }
267        }
268    }
269}
270
271impl Default for ExecutionEngine {
272    fn default() -> Self {
273        Self::new()
274    }
275}