1use crate::core::{ExecutionContext, ExecutionResult, NodeId, WorkflowGraph};
6use crate::state::GraphState;
7use crate::{RGraphError, RGraphResult};
8use std::time::{Duration, Instant};
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16pub struct ExecutionConfig {
17 pub max_nodes: usize,
19 pub continue_on_error: bool,
21 pub verbose_logging: bool,
23 pub timeout_seconds: Option<u64>,
25 pub max_execution_depth: usize,
27}
28
29impl Default for ExecutionConfig {
30 fn default() -> Self {
31 Self {
32 max_nodes: 1000,
33 continue_on_error: false,
34 verbose_logging: false,
35 timeout_seconds: Some(300), max_execution_depth: 100,
37 }
38 }
39}
40
41#[derive(Debug, Clone, PartialEq)]
43#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
44pub enum ExecutionMode {
45 Sequential,
47 Parallel,
49}
50
51impl Default for ExecutionMode {
52 fn default() -> Self {
53 ExecutionMode::Sequential
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct ExecutionResults {
60 pub final_state: GraphState,
62 pub metrics: ExecutionMetrics,
64 pub errors: Vec<ExecutionError>,
66}
67
68#[derive(Debug, Clone, Default)]
70pub struct ExecutionMetrics {
71 pub nodes_executed: usize,
73 pub total_duration: Duration,
75 pub success: bool,
77}
78
79#[derive(Debug, Clone)]
81pub struct ExecutionError {
82 pub node_id: String,
84 pub error_message: String,
86 pub timestamp: chrono::DateTime<chrono::Utc>,
88 pub error_type: String,
90}
91
92#[derive(Debug, Clone)]
94pub struct ExecutionEngine {
95 config: ExecutionConfig,
96}
97
98impl ExecutionEngine {
99 pub fn new() -> Self {
101 Self {
102 config: ExecutionConfig::default(),
103 }
104 }
105
106 pub fn with_config(config: ExecutionConfig) -> Self {
108 Self { config }
109 }
110
111 pub async fn execute(
113 &self,
114 graph: &WorkflowGraph,
115 mut state: GraphState,
116 ) -> RGraphResult<ExecutionResults> {
117 let start_time = Instant::now();
118 let mut errors = Vec::new();
119 let mut nodes_executed = 0;
120
121 if self.config.verbose_logging {
122 #[cfg(feature = "observability")]
123 tracing::info!("Starting graph execution: {}", graph.id());
124 #[cfg(not(feature = "observability"))]
125 eprintln!("Starting graph execution: {}", graph.id());
126 }
127
128 let entry_points = graph.entry_points_owned();
130 if entry_points.is_empty() {
131 return Err(RGraphError::execution("No entry points defined for graph"));
132 }
133
134 for entry_node_id in &entry_points {
136 match self
137 .execute_single_node(graph, &mut state, entry_node_id)
138 .await
139 {
140 Ok(_) => {
141 nodes_executed += 1;
142 }
143 Err(e) => {
144 let error = ExecutionError {
145 node_id: entry_node_id.as_str().to_string(),
146 error_message: e.to_string(),
147 timestamp: chrono::Utc::now(),
148 error_type: "NodeExecutionError".to_string(),
149 };
150 errors.push(error);
151
152 if !self.config.continue_on_error {
153 break;
154 }
155 }
156 }
157
158 if nodes_executed >= self.config.max_nodes {
159 break;
160 }
161 }
162
163 let total_duration = start_time.elapsed();
164 let success = errors.is_empty() || self.config.continue_on_error;
165
166 if self.config.verbose_logging {
167 #[cfg(feature = "observability")]
168 tracing::info!(
169 "Graph execution completed: {} (success: {}, duration: {:?})",
170 graph.id(),
171 success,
172 total_duration
173 );
174 #[cfg(not(feature = "observability"))]
175 eprintln!(
176 "Graph execution completed: {} (success: {}, duration: {:?})",
177 graph.id(),
178 success,
179 total_duration
180 );
181 }
182
183 Ok(ExecutionResults {
184 final_state: state,
185 metrics: ExecutionMetrics {
186 nodes_executed,
187 total_duration,
188 success,
189 },
190 errors,
191 })
192 }
193
194 async fn execute_single_node(
196 &self,
197 graph: &WorkflowGraph,
198 state: &mut GraphState,
199 node_id: &NodeId,
200 ) -> RGraphResult<()> {
201 let node = graph.get_node(node_id).ok_or_else(|| {
203 RGraphError::execution(format!("Node '{}' not found", node_id.as_str()))
204 })?;
205
206 let context = ExecutionContext::new(graph.id().to_string(), node_id.clone());
208
209 if self.config.verbose_logging {
210 #[cfg(feature = "observability")]
211 tracing::debug!("Executing node: {}", node_id.as_str());
212 #[cfg(not(feature = "observability"))]
213 eprintln!("Executing node: {}", node_id.as_str());
214 }
215
216 match node.execute(state, &context).await {
218 Ok(ExecutionResult::Continue) => {
219 if self.config.verbose_logging {
220 #[cfg(feature = "observability")]
221 tracing::debug!("Node '{}' completed successfully", node_id.as_str());
222 #[cfg(not(feature = "observability"))]
223 eprintln!("Node '{}' completed successfully", node_id.as_str());
224 }
225 Ok(())
226 }
227 Ok(ExecutionResult::Stop) => {
228 if self.config.verbose_logging {
229 #[cfg(feature = "observability")]
230 tracing::info!("Node '{}' requested execution stop", node_id.as_str());
231 #[cfg(not(feature = "observability"))]
232 eprintln!("Node '{}' requested execution stop", node_id.as_str());
233 }
234 Ok(())
235 }
236 Ok(ExecutionResult::Route(_next_node)) => {
237 if self.config.verbose_logging {
240 #[cfg(feature = "observability")]
241 tracing::debug!("Node '{}' requested routing", node_id.as_str());
242 #[cfg(not(feature = "observability"))]
243 eprintln!("Node '{}' requested routing", node_id.as_str());
244 }
245 Ok(())
246 }
247 Ok(ExecutionResult::JumpTo(_target_node)) => {
248 if self.config.verbose_logging {
251 #[cfg(feature = "observability")]
252 tracing::debug!("Node '{}' requested jump", node_id.as_str());
253 #[cfg(not(feature = "observability"))]
254 eprintln!("Node '{}' requested jump", node_id.as_str());
255 }
256 Ok(())
257 }
258 Err(e) => {
259 if self.config.verbose_logging {
260 #[cfg(feature = "observability")]
261 tracing::error!("Node '{}' failed: {}", node_id.as_str(), e);
262 #[cfg(not(feature = "observability"))]
263 eprintln!("Node '{}' failed: {}", node_id.as_str(), e);
264 }
265 Err(e)
266 }
267 }
268 }
269}
270
271impl Default for ExecutionEngine {
272 fn default() -> Self {
273 Self::new()
274 }
275}