Skip to main content

Module workflow

Module workflow 

Source
Expand description

Workflow orchestration system for multi-step agent operations.

The workflow module provides a DAG-based task scheduling system that:

  • Executes tasks in topological order based on dependencies
  • Validates workflows for cycles and missing dependencies before execution
  • Records all task events to the audit log
  • Supports sequential execution with failure handling
  • Provides cooperative cancellation for long-running workflows
  • Supports timeout configuration for tasks and workflows

§Architecture

The workflow system is built around three core components:

§Cancellation and Timeouts

§Cancellation

Workflows support cooperative cancellation via CancellationToken:

use forge_agent::workflow::{CancellationTokenSource, WorkflowExecutor};
use forge_agent::workflow::dag::Workflow;

let source = CancellationTokenSource::new();
let mut executor = WorkflowExecutor::new(workflow)
    .with_cancellation_source(source);

// Cancel from anywhere
source.cancel();

Tasks can cooperatively respond to cancellation by polling the token:

use forge_agent::workflow::task::TaskContext;

async fn my_task(context: &TaskContext) -> Result<TaskResult, TaskError> {
    while !context.cancellation_token().map_or(false, |t| t.poll_cancelled()) {
        // Do work
    }
    Ok(TaskResult::Cancelled)
}

See examples for complete cancellation-aware task examples.

§Timeouts

Both tasks and workflows can have timeout limits:

use std::time::Duration;
use forge_agent::workflow::{WorkflowExecutor, WorkflowTimeout};

let mut executor = WorkflowExecutor::new(workflow)
    .with_workflow_timeout(WorkflowTimeout::from_secs(300));

See timeout module for timeout configuration options.

§Quick Start

use forge_agent::{Workflow, WorkflowExecutor, MockTask};

let mut workflow = Workflow::new();
workflow.add_task(MockTask::new("a", "Task A"));
workflow.add_task(MockTask::new("b", "Task B").depends_on("a"));

let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute().await?;

§Workflow Validation

Workflows are validated before execution to detect:

  • Cycles in the dependency graph
  • Missing dependencies (references to non-existent tasks)
  • Orphan tasks (disconnected from the main graph)

§Execution Model

The executor processes tasks sequentially in topological order:

  1. Validate workflow structure
  2. Calculate execution order via topological sort
  3. Execute each task with audit logging
  4. Stop on first failure (rollback is deferred to phase 08-05)

Re-exports§

pub use auto_detect::AutoDetectConfig;
pub use auto_detect::DependencyAnalyzer;
pub use auto_detect::DependencyReason;
pub use auto_detect::DependencySuggestion;
pub use auto_detect::SuggestedTaskType;
pub use auto_detect::TaskSuggestion;
pub use builder::WorkflowBuilder;
pub use cancellation::CancellationToken;
pub use cancellation::CancellationTokenSource;
pub use cancellation::ChildToken;
pub use checkpoint::can_proceed;
pub use checkpoint::extract_confidence;
pub use checkpoint::requires_rollback;
pub use checkpoint::validate_checkpoint;
pub use checkpoint::CheckpointId;
pub use checkpoint::CheckpointSummary;
pub use checkpoint::RollbackRecommendation;
pub use checkpoint::ValidationCheckpoint;
pub use checkpoint::ValidationResult;
pub use checkpoint::ValidationStatus;
pub use checkpoint::WorkflowCheckpoint;
pub use checkpoint::WorkflowCheckpointService;
pub use combinators::ConditionalTask;
pub use combinators::ParallelTasks;
pub use combinators::TryCatchTask;
pub use dag::Workflow;
pub use dag::WorkflowError;
pub use deadlock::DeadlockDetector;
pub use deadlock::DeadlockError;
pub use deadlock::DeadlockWarning;
pub use deadlock::DeadlockWarningType;
pub use executor::WorkflowExecutor;
pub use executor::WorkflowResult;
pub use examples::CancellationAwareTask;
pub use examples::PollingTask;
pub use examples::TimeoutAndCancellationTask;
pub use examples::cooperative_cancellation_example;
pub use examples::timeout_cancellation_example;
pub use rollback::CompensationReport;
pub use rollback::ExecutableCompensation;
pub use rollback::RollbackEngine;
pub use rollback::RollbackError;
pub use rollback::RollbackReport;
pub use rollback::RollbackStrategy;
pub use state::TaskStatus;
pub use state::TaskSummary;
pub use state::WorkflowState;
pub use state::WorkflowStatus;
pub use task::CompensationAction;
pub use task::CompensationType;
pub use task::Dependency;
pub use task::TaskContext;
pub use task::TaskError;
pub use task::TaskId;
pub use task::TaskResult;
pub use task::WorkflowTask;
pub use tasks::AgentLoopTask;
pub use tasks::FileEditTask;
pub use tasks::FunctionTask;
pub use tasks::GraphQueryTask;
pub use tasks::GraphQueryType;
pub use tasks::ShellCommandTask;
pub use timeout::TaskTimeout;
pub use timeout::TimeoutConfig;
pub use timeout::TimeoutError;
pub use timeout::WorkflowTimeout;
pub use tools::ProcessGuard;
pub use tools::Tool;
pub use tools::ToolError;
pub use tools::ToolInvocation;
pub use tools::ToolInvocationResult;
pub use tools::ToolRegistry;
pub use tools::ToolResult;
pub use validate::ValidationReport;
pub use validate::WorkflowValidator;
pub use yaml::YamlWorkflow;
pub use yaml::YamlTask;
pub use yaml::YamlTaskParams;
pub use yaml::YamlTaskType;
pub use yaml::YamlWorkflowError;

Modules§

auto_detect
Automatic dependency detection using graph queries.
builder
Fluent builder API for workflow construction.
cancellation
Async cancellation token system with parent-child hierarchy.
checkpoint
Workflow state checkpointing with integrity validation.
combinators
Task composition helpers for complex workflows.
dag
DAG-based workflow representation using petgraph.
deadlock
Deadlock detection and prevention for workflow execution.
examples
Workflow examples demonstrating common patterns.
executor
Sequential workflow executor with audit logging and rollback.
rollback
Rollback engine for workflow failure recovery using Saga compensation pattern.
state
Workflow state inspection API.
task
Task abstraction and execution trait for workflow system.
tasks
Built-in task implementations for common workflow operations.
timeout
Timeout configuration for tasks and workflows.
tools
Tool registry for external tool integration with process lifecycle management.
validate
Workflow validation before execution.
yaml
YAML workflow definition and parsing.