Skip to main content

WorkflowExecutor

Struct WorkflowExecutor 

Source
pub struct WorkflowExecutor { /* private fields */ }
Expand description

Sequential workflow executor with rollback support.

Executes tasks in topological order based on dependencies, recording all task events to the audit log. On failure, automatically triggers selective rollback of dependent tasks.

§Execution Model

The executor:

  1. Validates the workflow structure
  2. Calculates execution order via topological sort
  3. Executes each task with audit logging
  4. Validates task result if validation config is set
  5. Creates checkpoint after each successful task
  6. On failure, triggers rollback of dependent tasks

Implementations§

Source§

impl WorkflowExecutor

Source

pub fn new(workflow: Workflow) -> Self

Creates a new workflow executor.

§Arguments
  • workflow - The workflow to execute
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute().await?;
Source

pub fn with_rollback_strategy(self, strategy: RollbackStrategy) -> Self

Sets the rollback strategy for this executor.

§Arguments
  • strategy - The rollback strategy to use
§Returns

The executor with the updated strategy (for builder pattern)

§Example
let executor = WorkflowExecutor::new(workflow)
    .with_rollback_strategy(RollbackStrategy::FailedOnly);
Source

pub fn with_checkpoint_service(self, service: WorkflowCheckpointService) -> Self

Sets the checkpoint service for this executor.

§Arguments
  • service - The checkpoint service to use for state persistence
§Returns

The executor with checkpoint service enabled (for builder pattern)

§Example
let executor = WorkflowExecutor::new(workflow)
    .with_checkpoint_service(checkpoint_service);
Source

pub fn with_validation_config(self, config: ValidationCheckpoint) -> Self

Sets the validation configuration for this executor.

§Arguments
  • config - The validation checkpoint configuration
§Returns

The executor with validation enabled (for builder pattern)

§Example
let executor = WorkflowExecutor::new(workflow)
    .with_validation_config(ValidationCheckpoint::default());
Source

pub fn with_cancellation_source(self, source: CancellationTokenSource) -> Self

Sets the cancellation source for this executor.

§Arguments
  • source - The cancellation token source to use
§Returns

The executor with cancellation enabled (for builder pattern)

§Example
use forge_agent::workflow::CancellationTokenSource;

let source = CancellationTokenSource::new();
let executor = WorkflowExecutor::new(workflow)
    .with_cancellation_source(source);
Source

pub fn cancellation_token(&self) -> Option<CancellationToken>

Returns a cancellation token if configured.

§Returns
  • Some(CancellationToken) if cancellation source is configured
  • None if no cancellation source
§Example
let source = CancellationTokenSource::new();
let executor = WorkflowExecutor::new(workflow)
    .with_cancellation_source(source);

if let Some(token) = executor.cancellation_token() {
    println!("Token cancelled: {}", token.is_cancelled());
}
Source

pub fn cancel(&self)

Cancels the workflow execution.

Triggers cancellation on the cancellation source if configured. This will cause the executor to stop after the current task completes.

§Example
let source = CancellationTokenSource::new();
let mut executor = WorkflowExecutor::new(workflow)
    .with_cancellation_source(source);

// Spawn execution in background
tokio::spawn(async move {
    executor.execute().await?;
});

// Cancel from main thread
executor.cancel();
Source

pub fn with_timeout_config(self, config: TimeoutConfig) -> Self

Sets the timeout configuration for this executor.

§Arguments
  • config - The timeout configuration to use
§Returns

The executor with timeout configuration enabled (for builder pattern)

§Example
use forge_agent::workflow::TimeoutConfig;

let executor = WorkflowExecutor::new(workflow)
    .with_timeout_config(TimeoutConfig::new());
Source

pub fn with_tool_registry(self, registry: ToolRegistry) -> Self

Sets the tool registry for this executor.

§Arguments
  • registry - The tool registry to use for tool invocation
§Returns

The executor with tool registry enabled (for builder pattern)

§Example
use forge_agent::workflow::tools::ToolRegistry;

let registry = ToolRegistry::new();
let executor = WorkflowExecutor::new(workflow)
    .with_tool_registry(registry);
Source

pub fn with_deadlock_timeout(self, timeout: Duration) -> Self

Sets the deadlock timeout for parallel execution layers.

§Arguments
  • timeout - The timeout duration for each layer execution
§Returns

The executor with deadlock timeout configured (for builder pattern)

§Example
use std::time::Duration;

let executor = WorkflowExecutor::new(workflow)
    .with_deadlock_timeout(Duration::from_secs(600)); // 10 minutes
Source

pub fn without_deadlock_timeout(self) -> Self

Disables the deadlock timeout for parallel execution.

Use this for workflows that may take longer than the default 5 minutes.

§Returns

The executor with deadlock timeout disabled (for builder pattern)

§Example
let executor = WorkflowExecutor::new(workflow)
    .without_deadlock_timeout();
Source

pub fn tool_registry(&self) -> Option<&Arc<ToolRegistry>>

Returns a reference to the tool registry if set.

§Returns
  • Some(&Arc<ToolRegistry>) if tool registry is set
  • None if no tool registry
§Example
if let Some(registry) = executor.tool_registry() {
    // Use tool registry
}
Source

pub fn timeout_config(&self) -> Option<&TimeoutConfig>

Returns a reference to the timeout configuration if set.

§Returns
  • Some(&TimeoutConfig) if timeout configuration is set
  • None if no timeout configuration
§Example
use forge_agent::workflow::TimeoutConfig;

let executor = WorkflowExecutor::new(workflow)
    .with_timeout_config(TimeoutConfig::new());

if let Some(config) = executor.timeout_config() {
    println!("Task timeout: {:?}", config.task_timeout);
}
Source

pub fn register_compensation( &mut self, task_id: TaskId, compensation: ToolCompensation, )

Registers a compensation action for a task.

Allows manual compensation registration for external tool side effects. Overrides any existing compensation for the task.

§Arguments
  • task_id - The task ID to register compensation for
  • compensation - The compensation action to register
§Example
executor.register_compensation(
    TaskId::new("task-1"),
    ToolCompensation::file_compensation("/tmp/output.txt")
);
Source

pub fn register_file_compensation( &mut self, task_id: TaskId, file_path: impl Into<String>, )

Registers a file creation compensation for a task.

Convenience method that automatically creates a file deletion compensation.

§Arguments
  • task_id - The task ID to register compensation for
  • file_path - Path to the file that will be deleted during rollback
§Example
executor.register_file_compensation(
    TaskId::new("task-1"),
    "/tmp/work_output.txt"
);
Source

pub fn validate_compensation_coverage(&self) -> CompensationReport

Validates compensation coverage for all workflow tasks.

Checks which tasks have compensation actions defined and logs warnings for tasks without compensation.

§Returns

A CompensationReport showing coverage statistics

§Example
let report = executor.validate_compensation_coverage();
if report.coverage_percentage < 1.0 {
    eprintln!("Warning: {:.0}% of tasks lack compensation", 100.0 * (1.0 - report.coverage_percentage));
}
Source

pub async fn execute(&mut self) -> Result<WorkflowResult, WorkflowError>

Executes the workflow.

Tasks are executed in topological order, with audit logging for each task start/completion/failed event. On failure, triggers rollback of dependent tasks.

§Returns
  • Ok(WorkflowResult) - Execution completed (may have partial completion)
  • Err(WorkflowError) - If workflow validation or ordering fails
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute().await?;
if result.success {
    println!("Completed {} tasks", result.completed_tasks.len());
}
Source

pub async fn execute_with_validations( &mut self, ) -> Result<WorkflowResult, WorkflowError>

Executes the workflow with validation checkpoints enabled.

Convenience method that sets default validation configuration and executes the workflow. Validation runs after each task to check confidence scores and trigger rollback if needed.

§Returns
  • Ok(WorkflowResult) - Execution completed (may have partial completion)
  • Err(WorkflowError) - If workflow validation or ordering fails
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute_with_validations().await?;
Source

pub async fn execute_with_timeout( &mut self, ) -> Result<WorkflowResult, WorkflowError>

Executes the workflow with a timeout.

Wraps the execute() method with a workflow-level timeout if configured. Returns a WorkflowTimeout error if the workflow exceeds the time limit.

§Returns
  • Ok(WorkflowResult) - Execution completed (may have partial completion)
  • Err(WorkflowError) - If workflow validation, ordering, or timeout fails
§Example
use forge_agent::workflow::{TimeoutConfig, WorkflowExecutor};
use std::time::Duration;

let timeout_config = TimeoutConfig {
    task_timeout: None,
    workflow_timeout: Some(WorkflowTimeout::from_secs(300)),
};

let mut executor = WorkflowExecutor::new(workflow)
    .with_timeout_config(timeout_config);
let result = executor.execute_with_timeout().await?;
Source

pub async fn execute_parallel( &mut self, ) -> Result<WorkflowResult, WorkflowError>

Executes the workflow with parallel task execution.

Tasks are executed in topological layers using fork-join parallelism. All tasks in the same layer execute concurrently via JoinSet, and each layer completes before the next layer starts.

§Returns
  • Ok(WorkflowResult) - Execution completed (may have partial completion)
  • Err(WorkflowError) - If workflow validation or ordering fails
§Example
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute_parallel().await?;
if result.success {
    println!("Completed {} tasks with parallel execution", result.completed_tasks.len());
}
Source

pub fn audit_log(&self) -> &AuditLog

Returns a reference to the audit log.

Source

pub fn completed_count(&self) -> usize

Returns the number of completed tasks.

Source

pub fn failed_count(&self) -> usize

Returns the number of failed tasks.

Source

pub fn task_count(&self) -> usize

Returns the total number of tasks in the workflow.

Source

pub fn task_ids(&self) -> Vec<TaskId>

Returns the IDs of all tasks in the workflow.

Source

pub fn completed_task_ids(&self) -> Vec<TaskId>

Returns the completed task IDs.

Source

pub fn failed_task_ids(&self) -> Vec<TaskId>

Returns the failed task IDs.

Source

pub fn is_task_completed(&self, id: &TaskId) -> bool

Checks if a task has completed.

Source

pub fn is_task_failed(&self, id: &TaskId) -> bool

Checks if a task has failed.

Source

pub fn progress(&self) -> f64

Returns execution progress as a percentage (0.0 to 1.0).

Source

pub fn rollback_strategy(&self) -> RollbackStrategy

Returns the rollback strategy.

Source

pub fn restore_checkpoint_state( &mut self, checkpoint: &WorkflowCheckpoint, ) -> Result<(), WorkflowError>

Validates and restores checkpoint state.

This is a convenience method that validates workflow consistency and then restores state from the checkpoint.

§Arguments
  • checkpoint - The checkpoint to restore
§Returns
  • Ok(()) if validation passed and state was restored
  • Err(WorkflowError) if validation fails
Source

pub fn can_resume(&self) -> bool

Checks if workflow has a valid checkpoint to resume from.

Returns true if a checkpoint exists for this workflow and the workflow structure is consistent with the checkpoint.

§Returns
  • true if workflow can be resumed
  • false if no checkpoint exists or validation fails
Source

pub async fn resume(&mut self) -> Result<WorkflowResult, WorkflowError>

Resumes workflow execution from the latest checkpoint.

Finds the latest checkpoint for the workflow, validates it, restores state, and continues execution from the checkpoint position.

§Returns
  • Ok(WorkflowResult) - Execution completed (may have partial completion)
  • Err(WorkflowError) - If checkpoint not found, corrupted, or workflow changed
Source

pub async fn resume_from_checkpoint_id( &mut self, checkpoint_id: &CheckpointId, ) -> Result<WorkflowResult, WorkflowError>

Resumes workflow execution from a specific checkpoint.

Loads the checkpoint by ID, validates it, restores state, and continues execution from the checkpoint position.

§Arguments
  • checkpoint_id - The checkpoint ID to resume from
§Returns
  • Ok(WorkflowResult) - Execution completed (may have partial completion)
  • Err(WorkflowError) - If checkpoint not found, corrupted, or workflow changed
Source§

impl WorkflowExecutor

Source

pub fn state(&self) -> WorkflowState

Returns a snapshot of the current workflow state.

This method provides a complete view of the workflow’s execution status including all tasks and their current states.

§Returns

A WorkflowState snapshot containing task summaries and status.

§Example
let mut executor = WorkflowExecutor::new(workflow);
executor.execute().await?;
let state = executor.state();
println!("Status: {:?}", state.status);
println!("Completed: {}", state.completed_tasks.len());

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more