use crate::core::{ExecutionContext, ExecutionResult, NodeId, WorkflowGraph};
use crate::state::GraphState;
use crate::{RGraphError, RGraphResult};
use std::time::{Duration, Instant};
use tracing;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ExecutionConfig {
pub max_nodes: usize,
pub continue_on_error: bool,
pub verbose_logging: bool,
pub timeout_seconds: Option<u64>,
pub max_execution_depth: usize,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
max_nodes: 1000,
continue_on_error: false,
verbose_logging: false,
timeout_seconds: Some(300), max_execution_depth: 100,
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ExecutionMode {
Sequential,
Parallel,
}
impl Default for ExecutionMode {
fn default() -> Self {
ExecutionMode::Sequential
}
}
#[derive(Debug, Clone)]
pub struct ExecutionResults {
pub final_state: GraphState,
pub metrics: ExecutionMetrics,
pub errors: Vec<ExecutionError>,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionMetrics {
pub nodes_executed: usize,
pub total_duration: Duration,
pub success: bool,
}
#[derive(Debug, Clone)]
pub struct ExecutionError {
pub node_id: String,
pub error_message: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub error_type: String,
}
#[derive(Debug, Clone)]
pub struct ExecutionEngine {
config: ExecutionConfig,
}
impl ExecutionEngine {
pub fn new() -> Self {
Self {
config: ExecutionConfig::default(),
}
}
pub fn with_config(config: ExecutionConfig) -> Self {
Self { config }
}
pub async fn execute(
&self,
graph: &WorkflowGraph,
mut state: GraphState,
) -> RGraphResult<ExecutionResults> {
let start_time = Instant::now();
let mut errors = Vec::new();
let mut nodes_executed = 0;
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::info!("Starting graph execution: {}", graph.id());
#[cfg(not(feature = "observability"))]
tracing::debug!("Starting graph execution: {}", graph.id());
}
let entry_points = graph.entry_points_owned();
if entry_points.is_empty() {
return Err(RGraphError::execution("No entry points defined for graph"));
}
for entry_node_id in &entry_points {
match self
.execute_single_node(graph, &mut state, entry_node_id)
.await
{
Ok(_) => {
nodes_executed += 1;
}
Err(e) => {
let error = ExecutionError {
node_id: entry_node_id.as_str().to_string(),
error_message: e.to_string(),
timestamp: chrono::Utc::now(),
error_type: "NodeExecutionError".to_string(),
};
errors.push(error);
if !self.config.continue_on_error {
break;
}
}
}
if nodes_executed >= self.config.max_nodes {
break;
}
}
let total_duration = start_time.elapsed();
let success = errors.is_empty() || self.config.continue_on_error;
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::info!(
"Graph execution completed: {} (success: {}, duration: {:?})",
graph.id(),
success,
total_duration
);
#[cfg(not(feature = "observability"))]
tracing::debug!(
"Graph execution completed: {} (success: {}, duration: {:?})",
graph.id(),
success,
total_duration
);
}
Ok(ExecutionResults {
final_state: state,
metrics: ExecutionMetrics {
nodes_executed,
total_duration,
success,
},
errors,
})
}
async fn execute_single_node(
&self,
graph: &WorkflowGraph,
state: &mut GraphState,
node_id: &NodeId,
) -> RGraphResult<()> {
let node = graph.get_node(node_id).ok_or_else(|| {
RGraphError::execution(format!("Node '{}' not found", node_id.as_str()))
})?;
let context = ExecutionContext::new(graph.id().to_string(), node_id.clone());
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::debug!("Executing node: {}", node_id.as_str());
#[cfg(not(feature = "observability"))]
tracing::debug!("Executing node: {}", node_id.as_str());
}
match node.execute(state, &context).await {
Ok(ExecutionResult::Continue) => {
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::debug!("Node '{}' completed successfully", node_id.as_str());
#[cfg(not(feature = "observability"))]
tracing::debug!("Node '{}' completed successfully", node_id.as_str());
}
Ok(())
}
Ok(ExecutionResult::Stop) => {
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::info!("Node '{}' requested execution stop", node_id.as_str());
#[cfg(not(feature = "observability"))]
tracing::debug!("Node '{}' requested execution stop", node_id.as_str());
}
Ok(())
}
Ok(ExecutionResult::Route(_next_node)) => {
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::debug!("Node '{}' requested routing", node_id.as_str());
#[cfg(not(feature = "observability"))]
tracing::debug!("Node '{}' requested routing", node_id.as_str());
}
Ok(())
}
Ok(ExecutionResult::JumpTo(_target_node)) => {
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::debug!("Node '{}' requested jump", node_id.as_str());
#[cfg(not(feature = "observability"))]
tracing::debug!("Node '{}' requested jump", node_id.as_str());
}
Ok(())
}
Err(e) => {
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::error!("Node '{}' failed: {}", node_id.as_str(), e);
#[cfg(not(feature = "observability"))]
tracing::debug!("Node '{}' failed: {}", node_id.as_str(), e);
}
Err(e)
}
}
}
}
impl Default for ExecutionEngine {
fn default() -> Self {
Self::new()
}
}