pub struct WorkflowExecutor { /* private fields */ }Expand description
Sequential workflow executor with rollback support.
Executes tasks in topological order based on dependencies, recording all task events to the audit log. On failure, automatically triggers selective rollback of dependent tasks.
§Execution Model
The executor:
- Validates the workflow structure
- Calculates execution order via topological sort
- Executes each task with audit logging
- Validates task result if validation config is set
- Creates checkpoint after each successful task
- On failure, triggers rollback of dependent tasks
Implementations§
Source§impl WorkflowExecutor
impl WorkflowExecutor
Sourcepub fn with_rollback_strategy(self, strategy: RollbackStrategy) -> Self
pub fn with_rollback_strategy(self, strategy: RollbackStrategy) -> Self
Sourcepub fn with_checkpoint_service(self, service: WorkflowCheckpointService) -> Self
pub fn with_checkpoint_service(self, service: WorkflowCheckpointService) -> Self
Sourcepub fn with_validation_config(self, config: ValidationCheckpoint) -> Self
pub fn with_validation_config(self, config: ValidationCheckpoint) -> Self
Sourcepub fn with_cancellation_source(self, source: CancellationTokenSource) -> Self
pub fn with_cancellation_source(self, source: CancellationTokenSource) -> Self
Sets the cancellation source for this executor.
§Arguments
source- The cancellation token source to use
§Returns
The executor with cancellation enabled (for builder pattern)
§Example
use forge_agent::workflow::CancellationTokenSource;
let source = CancellationTokenSource::new();
let executor = WorkflowExecutor::new(workflow)
.with_cancellation_source(source);Sourcepub fn cancellation_token(&self) -> Option<CancellationToken>
pub fn cancellation_token(&self) -> Option<CancellationToken>
Returns a cancellation token if configured.
§Returns
Some(CancellationToken)if cancellation source is configuredNoneif no cancellation source
§Example
let source = CancellationTokenSource::new();
let executor = WorkflowExecutor::new(workflow)
.with_cancellation_source(source);
if let Some(token) = executor.cancellation_token() {
println!("Token cancelled: {}", token.is_cancelled());
}Sourcepub fn cancel(&self)
pub fn cancel(&self)
Cancels the workflow execution.
Triggers cancellation on the cancellation source if configured. This will cause the executor to stop after the current task completes.
§Example
let source = CancellationTokenSource::new();
let mut executor = WorkflowExecutor::new(workflow)
.with_cancellation_source(source);
// Spawn execution in background
tokio::spawn(async move {
executor.execute().await?;
});
// Cancel from main thread
executor.cancel();Sourcepub fn with_timeout_config(self, config: TimeoutConfig) -> Self
pub fn with_timeout_config(self, config: TimeoutConfig) -> Self
Sets the timeout configuration for this executor.
§Arguments
config- The timeout configuration to use
§Returns
The executor with timeout configuration enabled (for builder pattern)
§Example
use forge_agent::workflow::TimeoutConfig;
let executor = WorkflowExecutor::new(workflow)
.with_timeout_config(TimeoutConfig::new());Sourcepub fn with_tool_registry(self, registry: ToolRegistry) -> Self
pub fn with_tool_registry(self, registry: ToolRegistry) -> Self
Sets the tool registry for this executor.
§Arguments
registry- The tool registry to use for tool invocation
§Returns
The executor with tool registry enabled (for builder pattern)
§Example
use forge_agent::workflow::tools::ToolRegistry;
let registry = ToolRegistry::new();
let executor = WorkflowExecutor::new(workflow)
.with_tool_registry(registry);Sourcepub fn with_deadlock_timeout(self, timeout: Duration) -> Self
pub fn with_deadlock_timeout(self, timeout: Duration) -> Self
Sets the deadlock timeout for parallel execution layers.
§Arguments
timeout- The timeout duration for each layer execution
§Returns
The executor with deadlock timeout configured (for builder pattern)
§Example
use std::time::Duration;
let executor = WorkflowExecutor::new(workflow)
.with_deadlock_timeout(Duration::from_secs(600)); // 10 minutesSourcepub fn without_deadlock_timeout(self) -> Self
pub fn without_deadlock_timeout(self) -> Self
Sourcepub fn tool_registry(&self) -> Option<&Arc<ToolRegistry>>
pub fn tool_registry(&self) -> Option<&Arc<ToolRegistry>>
Sourcepub fn timeout_config(&self) -> Option<&TimeoutConfig>
pub fn timeout_config(&self) -> Option<&TimeoutConfig>
Returns a reference to the timeout configuration if set.
§Returns
Some(&TimeoutConfig)if timeout configuration is setNoneif no timeout configuration
§Example
use forge_agent::workflow::TimeoutConfig;
let executor = WorkflowExecutor::new(workflow)
.with_timeout_config(TimeoutConfig::new());
if let Some(config) = executor.timeout_config() {
println!("Task timeout: {:?}", config.task_timeout);
}Sourcepub fn register_compensation(
&mut self,
task_id: TaskId,
compensation: ToolCompensation,
)
pub fn register_compensation( &mut self, task_id: TaskId, compensation: ToolCompensation, )
Registers a compensation action for a task.
Allows manual compensation registration for external tool side effects. Overrides any existing compensation for the task.
§Arguments
task_id- The task ID to register compensation forcompensation- The compensation action to register
§Example
executor.register_compensation(
TaskId::new("task-1"),
ToolCompensation::file_compensation("/tmp/output.txt")
);Sourcepub fn register_file_compensation(
&mut self,
task_id: TaskId,
file_path: impl Into<String>,
)
pub fn register_file_compensation( &mut self, task_id: TaskId, file_path: impl Into<String>, )
Registers a file creation compensation for a task.
Convenience method that automatically creates a file deletion compensation.
§Arguments
task_id- The task ID to register compensation forfile_path- Path to the file that will be deleted during rollback
§Example
executor.register_file_compensation(
TaskId::new("task-1"),
"/tmp/work_output.txt"
);Sourcepub fn validate_compensation_coverage(&self) -> CompensationReport
pub fn validate_compensation_coverage(&self) -> CompensationReport
Validates compensation coverage for all workflow tasks.
Checks which tasks have compensation actions defined and logs warnings for tasks without compensation.
§Returns
A CompensationReport showing coverage statistics
§Example
let report = executor.validate_compensation_coverage();
if report.coverage_percentage < 1.0 {
eprintln!("Warning: {:.0}% of tasks lack compensation", 100.0 * (1.0 - report.coverage_percentage));
}Sourcepub async fn execute(&mut self) -> Result<WorkflowResult, WorkflowError>
pub async fn execute(&mut self) -> Result<WorkflowResult, WorkflowError>
Executes the workflow.
Tasks are executed in topological order, with audit logging for each task start/completion/failed event. On failure, triggers rollback of dependent tasks.
§Returns
Ok(WorkflowResult)- Execution completed (may have partial completion)Err(WorkflowError)- If workflow validation or ordering fails
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute().await?;
if result.success {
println!("Completed {} tasks", result.completed_tasks.len());
}Sourcepub async fn execute_with_validations(
&mut self,
) -> Result<WorkflowResult, WorkflowError>
pub async fn execute_with_validations( &mut self, ) -> Result<WorkflowResult, WorkflowError>
Executes the workflow with validation checkpoints enabled.
Convenience method that sets default validation configuration and executes the workflow. Validation runs after each task to check confidence scores and trigger rollback if needed.
§Returns
Ok(WorkflowResult)- Execution completed (may have partial completion)Err(WorkflowError)- If workflow validation or ordering fails
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute_with_validations().await?;Sourcepub async fn execute_with_timeout(
&mut self,
) -> Result<WorkflowResult, WorkflowError>
pub async fn execute_with_timeout( &mut self, ) -> Result<WorkflowResult, WorkflowError>
Executes the workflow with a timeout.
Wraps the execute() method with a workflow-level timeout if configured. Returns a WorkflowTimeout error if the workflow exceeds the time limit.
§Returns
Ok(WorkflowResult)- Execution completed (may have partial completion)Err(WorkflowError)- If workflow validation, ordering, or timeout fails
§Example
use forge_agent::workflow::{TimeoutConfig, WorkflowExecutor};
use std::time::Duration;
let timeout_config = TimeoutConfig {
task_timeout: None,
workflow_timeout: Some(WorkflowTimeout::from_secs(300)),
};
let mut executor = WorkflowExecutor::new(workflow)
.with_timeout_config(timeout_config);
let result = executor.execute_with_timeout().await?;Sourcepub async fn execute_parallel(
&mut self,
) -> Result<WorkflowResult, WorkflowError>
pub async fn execute_parallel( &mut self, ) -> Result<WorkflowResult, WorkflowError>
Executes the workflow with parallel task execution.
Tasks are executed in topological layers using fork-join parallelism. All tasks in the same layer execute concurrently via JoinSet, and each layer completes before the next layer starts.
§Returns
Ok(WorkflowResult)- Execution completed (may have partial completion)Err(WorkflowError)- If workflow validation or ordering fails
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute_parallel().await?;
if result.success {
println!("Completed {} tasks with parallel execution", result.completed_tasks.len());
}Sourcepub fn completed_count(&self) -> usize
pub fn completed_count(&self) -> usize
Returns the number of completed tasks.
Sourcepub fn failed_count(&self) -> usize
pub fn failed_count(&self) -> usize
Returns the number of failed tasks.
Sourcepub fn task_count(&self) -> usize
pub fn task_count(&self) -> usize
Returns the total number of tasks in the workflow.
Sourcepub fn completed_task_ids(&self) -> Vec<TaskId>
pub fn completed_task_ids(&self) -> Vec<TaskId>
Returns the completed task IDs.
Sourcepub fn failed_task_ids(&self) -> Vec<TaskId>
pub fn failed_task_ids(&self) -> Vec<TaskId>
Returns the failed task IDs.
Sourcepub fn is_task_completed(&self, id: &TaskId) -> bool
pub fn is_task_completed(&self, id: &TaskId) -> bool
Checks if a task has completed.
Sourcepub fn is_task_failed(&self, id: &TaskId) -> bool
pub fn is_task_failed(&self, id: &TaskId) -> bool
Checks if a task has failed.
Sourcepub fn rollback_strategy(&self) -> RollbackStrategy
pub fn rollback_strategy(&self) -> RollbackStrategy
Returns the rollback strategy.
Sourcepub fn restore_checkpoint_state(
&mut self,
checkpoint: &WorkflowCheckpoint,
) -> Result<(), WorkflowError>
pub fn restore_checkpoint_state( &mut self, checkpoint: &WorkflowCheckpoint, ) -> Result<(), WorkflowError>
Sourcepub fn can_resume(&self) -> bool
pub fn can_resume(&self) -> bool
Checks if workflow has a valid checkpoint to resume from.
Returns true if a checkpoint exists for this workflow and the workflow structure is consistent with the checkpoint.
§Returns
trueif workflow can be resumedfalseif no checkpoint exists or validation fails
Sourcepub async fn resume(&mut self) -> Result<WorkflowResult, WorkflowError>
pub async fn resume(&mut self) -> Result<WorkflowResult, WorkflowError>
Resumes workflow execution from the latest checkpoint.
Finds the latest checkpoint for the workflow, validates it, restores state, and continues execution from the checkpoint position.
§Returns
Ok(WorkflowResult)- Execution completed (may have partial completion)Err(WorkflowError)- If checkpoint not found, corrupted, or workflow changed
Sourcepub async fn resume_from_checkpoint_id(
&mut self,
checkpoint_id: &CheckpointId,
) -> Result<WorkflowResult, WorkflowError>
pub async fn resume_from_checkpoint_id( &mut self, checkpoint_id: &CheckpointId, ) -> Result<WorkflowResult, WorkflowError>
Resumes workflow execution from a specific checkpoint.
Loads the checkpoint by ID, validates it, restores state, and continues execution from the checkpoint position.
§Arguments
checkpoint_id- The checkpoint ID to resume from
§Returns
Ok(WorkflowResult)- Execution completed (may have partial completion)Err(WorkflowError)- If checkpoint not found, corrupted, or workflow changed
Source§impl WorkflowExecutor
impl WorkflowExecutor
Sourcepub fn state(&self) -> WorkflowState
pub fn state(&self) -> WorkflowState
Returns a snapshot of the current workflow state.
This method provides a complete view of the workflow’s execution status including all tasks and their current states.
§Returns
A WorkflowState snapshot containing task summaries and status.
§Example
let mut executor = WorkflowExecutor::new(workflow);
executor.execute().await?;
let state = executor.state();
println!("Status: {:?}", state.status);
println!("Completed: {}", state.completed_tasks.len());Auto Trait Implementations§
impl Freeze for WorkflowExecutor
impl !RefUnwindSafe for WorkflowExecutor
impl Send for WorkflowExecutor
impl Sync for WorkflowExecutor
impl Unpin for WorkflowExecutor
impl UnsafeUnpin for WorkflowExecutor
impl !UnwindSafe for WorkflowExecutor
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more