use crate::core::{WorkflowGraph, Node, NodeId, ExecutionContext, ExecutionResult};
use crate::state::GraphState;
use crate::{RGraphError, RGraphResult};
use std::sync::Arc;
use std::time::Duration;
use std::collections::{HashMap, VecDeque};
use tokio::time::timeout;
use futures::future::BoxFuture;
use uuid::Uuid;
#[cfg(feature = "observability")]
use crate::observability::{GraphObserver, ExecutionMetrics};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ExecutionConfig {
pub max_execution_time: Option<Duration>,
pub max_node_execution_time: Option<Duration>,
pub max_nodes: Option<usize>,
pub continue_on_error: bool,
pub mode: ExecutionMode,
pub verbose_logging: bool,
pub max_recursion_depth: usize,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
max_execution_time: Some(Duration::from_secs(300)), max_node_execution_time: Some(Duration::from_secs(60)), max_nodes: Some(1000),
continue_on_error: false,
mode: ExecutionMode::Sequential,
verbose_logging: false,
max_recursion_depth: 100,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ExecutionMode {
Sequential,
Parallel,
MaxParallel,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GraphExecutionResult {
pub execution_id: String,
pub final_state: GraphState,
pub metrics: ExecutionMetrics,
pub execution_path: Vec<NodeId>,
pub errors: Vec<ExecutionError>,
pub success: bool,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ExecutionError {
pub node_id: String,
pub error_message: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub error_type: String,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ExecutionMetrics {
pub total_duration: Duration,
pub nodes_executed: usize,
pub node_durations: HashMap<String, Duration>,
pub start_time: chrono::DateTime<chrono::Utc>,
pub end_time: chrono::DateTime<chrono::Utc>,
pub peak_memory_usage: Option<usize>,
}
pub struct ExecutionEngine {
config: ExecutionConfig,
#[cfg(feature = "observability")]
observers: Vec<Arc<dyn GraphObserver>>,
}
impl ExecutionEngine {
pub fn new() -> Self {
Self {
config: ExecutionConfig::default(),
#[cfg(feature = "observability")]
observers: Vec::new(),
}
}
pub fn with_config(config: ExecutionConfig) -> Self {
Self {
config,
#[cfg(feature = "observability")]
observers: Vec::new(),
}
}
#[cfg(feature = "observability")]
pub fn add_observer(&mut self, observer: Arc<dyn GraphObserver>) {
self.observers.push(observer);
}
pub async fn execute(
&self,
graph: &WorkflowGraph,
mut state: GraphState,
) -> RGraphResult<GraphExecutionResult> {
let execution_id = Uuid::new_v4().to_string();
let start_time = chrono::Utc::now();
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::info!("Starting graph execution: {}", execution_id);
#[cfg(not(feature = "observability"))]
tracing::debug!("Starting graph execution: {}", execution_id);
}
graph.validate()?;
let mut execution_path = Vec::new();
let mut errors = Vec::new();
let mut node_durations = HashMap::new();
let mut nodes_executed = 0;
let execution_future = self.execute_internal(
graph,
&mut state,
&execution_id,
&mut execution_path,
&mut errors,
&mut node_durations,
&mut nodes_executed,
);
let execution_result = if let Some(max_time) = self.config.max_execution_time {
match timeout(max_time, execution_future).await {
Ok(result) => result,
Err(_) => {
errors.push(ExecutionError {
node_id: "timeout".to_string(),
error_message: "Graph execution timed out".to_string(),
timestamp: chrono::Utc::now(),
error_type: "Timeout".to_string(),
});
Err(RGraphError::execution("Graph execution timed out"))
}
}
} else {
execution_future.await
};
let end_time = chrono::Utc::now();
let total_duration = (end_time - start_time).to_std().unwrap_or(Duration::ZERO);
let success = execution_result.is_ok() && errors.is_empty();
if self.config.verbose_logging {
if success {
#[cfg(feature = "observability")]
tracing::info!("Graph execution completed successfully: {}", execution_id);
#[cfg(not(feature = "observability"))]
tracing::debug!("Graph execution completed successfully: {}", execution_id);
} else {
#[cfg(feature = "observability")]
tracing::error!("Graph execution failed: {}", execution_id);
#[cfg(not(feature = "observability"))]
tracing::debug!("Graph execution failed: {}", execution_id);
}
}
Ok(GraphExecutionResult {
execution_id,
final_state: state,
metrics: ExecutionMetrics {
total_duration,
nodes_executed,
node_durations,
start_time,
end_time,
peak_memory_usage: None, },
execution_path,
errors,
success,
})
}
async fn execute_internal(
&self,
graph: &WorkflowGraph,
state: &mut GraphState,
execution_id: &str,
execution_path: &mut Vec<NodeId>,
errors: &mut Vec<ExecutionError>,
node_durations: &mut HashMap<String, Duration>,
nodes_executed: &mut usize,
) -> RGraphResult<()> {
let entry_points = graph.entry_points();
if entry_points.is_empty() {
return Err(RGraphError::execution("No entry points defined"));
}
match self.config.mode {
ExecutionMode::Sequential => {
for entry_point in entry_points {
self.execute_node_sequence(
graph,
state,
&entry_point,
execution_id,
execution_path,
errors,
node_durations,
nodes_executed,
0, ).await?;
}
}
ExecutionMode::Parallel | ExecutionMode::MaxParallel => {
let futures: Vec<BoxFuture<'_, RGraphResult<()>>> = entry_points
.into_iter()
.map(|entry_point| {
Box::pin(self.execute_node_sequence(
graph,
state,
&entry_point,
execution_id,
execution_path,
errors,
node_durations,
nodes_executed,
0,
)) as BoxFuture<'_, RGraphResult<()>>
})
.collect();
let results = futures::future::join_all(futures).await;
for result in results {
if let Err(e) = result {
if !self.config.continue_on_error {
return Err(e);
}
errors.push(ExecutionError {
node_id: "parallel_execution".to_string(),
error_message: e.to_string(),
timestamp: chrono::Utc::now(),
error_type: "ParallelExecutionError".to_string(),
});
}
}
}
}
Ok(())
}
fn execute_node_sequence(
&self,
graph: &WorkflowGraph,
state: &mut GraphState,
current_node_id: &NodeId,
execution_id: &str,
execution_path: &mut Vec<NodeId>,
errors: &mut Vec<ExecutionError>,
node_durations: &mut HashMap<String, Duration>,
nodes_executed: &mut usize,
recursion_depth: usize,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = RGraphResult<()>> + '_>> {
Box::pin(async move {
if recursion_depth >= self.config.max_recursion_depth {
return Err(RGraphError::execution(
format!("Maximum recursion depth {} exceeded", self.config.max_recursion_depth)
));
}
if let Some(max_nodes) = self.config.max_nodes {
if *nodes_executed >= max_nodes {
return Err(RGraphError::execution(
format!("Maximum node limit {} exceeded", max_nodes)
));
}
}
let node = self.get_node_from_graph(graph, current_node_id)?;
let mut context = ExecutionContext::new(
graph.id().to_string(),
current_node_id.clone(),
);
context.execution_path = execution_path.clone();
let node_start = std::time::Instant::now();
let execution_result = if let Some(max_time) = self.config.max_node_execution_time {
match timeout(max_time, node.execute(state, &context)).await {
Ok(result) => result,
Err(_) => {
let error = ExecutionError {
node_id: current_node_id.as_str().to_string(),
error_message: "Node execution timed out".to_string(),
timestamp: chrono::Utc::now(),
error_type: "NodeTimeout".to_string(),
};
errors.push(error);
if self.config.continue_on_error {
return Ok(());
} else {
return Err(RGraphError::node(
current_node_id.as_str(),
"Node execution timed out"
));
}
}
}
} else {
node.execute(state, &context).await
};
let node_duration = node_start.elapsed();
node_durations.insert(current_node_id.as_str().to_string(), node_duration);
*nodes_executed += 1;
execution_path.push(current_node_id.clone());
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::debug!(
"Executed node '{}' in {:.2}ms",
current_node_id.as_str(),
node_duration.as_millis()
);
#[cfg(not(feature = "observability"))]
tracing::debug!(
"Executed node '{}' in {:.2}ms",
current_node_id.as_str(),
node_duration.as_millis()
);
}
match execution_result {
Ok(ExecutionResult::Continue) => {
self.continue_to_next_nodes(
graph,
state,
current_node_id,
execution_id,
execution_path,
errors,
node_durations,
nodes_executed,
recursion_depth + 1,
).await?;
}
Ok(ExecutionResult::Stop) => {
if self.config.verbose_logging {
#[cfg(feature = "observability")]
tracing::info!("Node '{}' requested execution stop", current_node_id.as_str());
#[cfg(not(feature = "observability"))]
tracing::debug!("Node '{}' requested execution stop", current_node_id.as_str());
}
}
Ok(ExecutionResult::JumpTo(next_node_id)) => {
self.execute_node_sequence(
graph,
state,
&next_node_id,
execution_id,
execution_path,
errors,
node_durations,
nodes_executed,
recursion_depth + 1,
).await?;
}
Ok(ExecutionResult::Route(next_node_id)) => {
let next_node_id = NodeId::new(next_node_id);
self.execute_node_sequence(
graph,
state,
&next_node_id,
execution_id,
execution_path,
errors,
node_durations,
nodes_executed,
recursion_depth + 1,
).await?;
}
Err(e) => {
let error = ExecutionError {
node_id: current_node_id.as_str().to_string(),
error_message: e.to_string(),
timestamp: chrono::Utc::now(),
error_type: "NodeExecutionError".to_string(),
};
errors.push(error);
if !self.config.continue_on_error {
return Err(e);
}
}
}
Ok(())
})
}
async fn continue_to_next_nodes(
&self,
graph: &WorkflowGraph,
state: &mut GraphState,
current_node_id: &NodeId,
execution_id: &str,
execution_path: &mut Vec<NodeId>,
errors: &mut Vec<ExecutionError>,
node_durations: &mut HashMap<String, Duration>,
nodes_executed: &mut usize,
recursion_depth: usize,
) -> RGraphResult<()> {
Ok(())
}
fn get_node_from_graph(
&self,
_graph: &WorkflowGraph,
node_id: &NodeId,
) -> RGraphResult<Arc<dyn Node>> {
Err(RGraphError::execution(
format!("Node lookup not implemented for node: {}", node_id.as_str())
))
}
}
impl Default for ExecutionEngine {
fn default() -> Self {
Self::new()
}
}
impl WorkflowGraph {
pub async fn execute(&self, state: GraphState) -> RGraphResult<GraphExecutionResult> {
let engine = ExecutionEngine::new();
engine.execute(self, state).await
}
pub async fn execute_with_config(
&self,
state: GraphState,
config: ExecutionConfig,
) -> RGraphResult<GraphExecutionResult> {
let engine = ExecutionEngine::with_config(config);
engine.execute(self, state).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{Node, ExecutionContext, ExecutionResult};
use crate::state::{GraphState, StateValue};
use async_trait::async_trait;
struct MockNode {
id: NodeId,
name: String,
result: ExecutionResult,
}
impl MockNode {
fn new(
id: impl Into<NodeId>,
name: impl Into<String>,
result: ExecutionResult,
) -> Arc<Self> {
Arc::new(Self {
id: id.into(),
name: name.into(),
result,
})
}
}
#[async_trait]
impl Node for MockNode {
async fn execute(
&self,
state: &mut GraphState,
_context: &ExecutionContext,
) -> RGraphResult<ExecutionResult> {
let mut executed_nodes = state.get("executed_nodes")
.unwrap_or(StateValue::Array(vec![]));
if let StateValue::Array(ref mut nodes) = executed_nodes {
nodes.push(StateValue::String(self.name.clone()));
}
state.set("executed_nodes", executed_nodes);
Ok(self.result.clone())
}
fn id(&self) -> &NodeId {
&self.id
}
fn name(&self) -> &str {
&self.name
}
}
#[test]
fn test_execution_config_default() {
let config = ExecutionConfig::default();
assert_eq!(config.max_execution_time, Some(Duration::from_secs(300)));
assert_eq!(config.max_node_execution_time, Some(Duration::from_secs(60)));
assert_eq!(config.max_nodes, Some(1000));
assert!(!config.continue_on_error);
assert_eq!(config.mode, ExecutionMode::Sequential);
assert!(!config.verbose_logging);
assert_eq!(config.max_recursion_depth, 100);
}
#[test]
fn test_execution_metrics() {
let start_time = chrono::Utc::now();
let end_time = start_time + chrono::Duration::seconds(10);
let metrics = ExecutionMetrics {
total_duration: Duration::from_secs(10),
nodes_executed: 5,
node_durations: HashMap::new(),
start_time,
end_time,
peak_memory_usage: Some(1024 * 1024), };
assert_eq!(metrics.total_duration, Duration::from_secs(10));
assert_eq!(metrics.nodes_executed, 5);
assert_eq!(metrics.peak_memory_usage, Some(1024 * 1024));
}
#[test]
fn test_execution_error() {
let error = ExecutionError {
node_id: "test_node".to_string(),
error_message: "Test error".to_string(),
timestamp: chrono::Utc::now(),
error_type: "TestError".to_string(),
};
assert_eq!(error.node_id, "test_node");
assert_eq!(error.error_message, "Test error");
assert_eq!(error.error_type, "TestError");
}
#[tokio::test]
async fn test_execution_engine_creation() {
let engine = ExecutionEngine::new();
assert_eq!(engine.config.mode, ExecutionMode::Sequential);
let custom_config = ExecutionConfig {
mode: ExecutionMode::Parallel,
verbose_logging: true,
..Default::default()
};
let custom_engine = ExecutionEngine::with_config(custom_config);
assert_eq!(custom_engine.config.mode, ExecutionMode::Parallel);
assert!(custom_engine.config.verbose_logging);
}
}