Expand description
Workflow orchestration system for multi-step agent operations.
The workflow module provides a DAG-based task scheduling system that:
- Executes tasks in topological order based on dependencies
- Validates workflows for cycles and missing dependencies before execution
- Records all task events to the audit log
- Supports sequential execution with failure handling
- Provides cooperative cancellation for long-running workflows
- Supports timeout configuration for tasks and workflows
§Architecture
The workflow system is built around three core components:
DAG: Directed acyclic graph for task representationWorkflowTask: Async trait for task executionWorkflowExecutor: Sequential task executor
§Cancellation and Timeouts
§Cancellation
Workflows support cooperative cancellation via CancellationToken:
ⓘ
use forge_agent::workflow::{CancellationTokenSource, WorkflowExecutor};
use forge_agent::workflow::dag::Workflow;
let source = CancellationTokenSource::new();
let mut executor = WorkflowExecutor::new(workflow)
.with_cancellation_source(source);
// Cancel from anywhere
source.cancel();Tasks can cooperatively respond to cancellation by polling the token:
ⓘ
use forge_agent::workflow::task::TaskContext;
async fn my_task(context: &TaskContext) -> Result<TaskResult, TaskError> {
while !context.cancellation_token().map_or(false, |t| t.poll_cancelled()) {
// Do work
}
Ok(TaskResult::Cancelled)
}See examples for complete cancellation-aware task examples.
§Timeouts
Both tasks and workflows can have timeout limits:
ⓘ
use std::time::Duration;
use forge_agent::workflow::{WorkflowExecutor, WorkflowTimeout};
let mut executor = WorkflowExecutor::new(workflow)
.with_workflow_timeout(WorkflowTimeout::from_secs(300));See timeout module for timeout configuration options.
§Quick Start
ⓘ
use forge_agent::{Workflow, WorkflowExecutor, MockTask};
let mut workflow = Workflow::new();
workflow.add_task(MockTask::new("a", "Task A"));
workflow.add_task(MockTask::new("b", "Task B").depends_on("a"));
let mut executor = WorkflowExecutor::new(workflow);
let result = executor.execute().await?;§Workflow Validation
Workflows are validated before execution to detect:
- Cycles in the dependency graph
- Missing dependencies (references to non-existent tasks)
- Orphan tasks (disconnected from the main graph)
§Execution Model
The executor processes tasks sequentially in topological order:
- Validate workflow structure
- Calculate execution order via topological sort
- Execute each task with audit logging
- Stop on first failure (rollback is deferred to phase 08-05)
Re-exports§
pub use auto_detect::AutoDetectConfig;pub use auto_detect::DependencyAnalyzer;pub use auto_detect::DependencyReason;pub use auto_detect::DependencySuggestion;pub use auto_detect::SuggestedTaskType;pub use auto_detect::TaskSuggestion;pub use builder::WorkflowBuilder;pub use cancellation::CancellationToken;pub use cancellation::CancellationTokenSource;pub use cancellation::ChildToken;pub use checkpoint::can_proceed;pub use checkpoint::extract_confidence;pub use checkpoint::requires_rollback;pub use checkpoint::validate_checkpoint;pub use checkpoint::CheckpointId;pub use checkpoint::CheckpointSummary;pub use checkpoint::RollbackRecommendation;pub use checkpoint::ValidationCheckpoint;pub use checkpoint::ValidationResult;pub use checkpoint::ValidationStatus;pub use checkpoint::WorkflowCheckpoint;pub use checkpoint::WorkflowCheckpointService;pub use combinators::ConditionalTask;pub use combinators::ParallelTasks;pub use combinators::TryCatchTask;pub use dag::Workflow;pub use dag::WorkflowError;pub use deadlock::DeadlockDetector;pub use deadlock::DeadlockError;pub use deadlock::DeadlockWarning;pub use deadlock::DeadlockWarningType;pub use executor::WorkflowExecutor;pub use executor::WorkflowResult;pub use examples::CancellationAwareTask;pub use examples::PollingTask;pub use examples::TimeoutAndCancellationTask;pub use examples::cooperative_cancellation_example;pub use examples::timeout_cancellation_example;pub use rollback::CompensationReport;pub use rollback::ExecutableCompensation;pub use rollback::RollbackEngine;pub use rollback::RollbackError;pub use rollback::RollbackReport;pub use rollback::RollbackStrategy;pub use state::TaskStatus;pub use state::TaskSummary;pub use state::WorkflowState;pub use state::WorkflowStatus;pub use task::CompensationAction;pub use task::CompensationType;pub use task::Dependency;pub use task::TaskContext;pub use task::TaskError;pub use task::TaskId;pub use task::TaskResult;pub use task::WorkflowTask;pub use tasks::AgentLoopTask;pub use tasks::FileEditTask;pub use tasks::FunctionTask;pub use tasks::GraphQueryTask;pub use tasks::GraphQueryType;pub use tasks::ShellCommandTask;pub use timeout::TaskTimeout;pub use timeout::TimeoutConfig;pub use timeout::TimeoutError;pub use timeout::WorkflowTimeout;pub use tools::ProcessGuard;pub use tools::Tool;pub use tools::ToolError;pub use tools::ToolInvocation;pub use tools::ToolInvocationResult;pub use tools::ToolRegistry;pub use tools::ToolResult;pub use validate::ValidationReport;pub use validate::WorkflowValidator;pub use yaml::YamlWorkflow;pub use yaml::YamlTask;pub use yaml::YamlTaskParams;pub use yaml::YamlTaskType;pub use yaml::YamlWorkflowError;
Modules§
- auto_
detect - Automatic dependency detection using graph queries.
- builder
- Fluent builder API for workflow construction.
- cancellation
- Async cancellation token system with parent-child hierarchy.
- checkpoint
- Workflow state checkpointing with integrity validation.
- combinators
- Task composition helpers for complex workflows.
- dag
- DAG-based workflow representation using petgraph.
- deadlock
- Deadlock detection and prevention for workflow execution.
- examples
- Workflow examples demonstrating common patterns.
- executor
- Sequential workflow executor with audit logging and rollback.
- rollback
- Rollback engine for workflow failure recovery using Saga compensation pattern.
- state
- Workflow state inspection API.
- task
- Task abstraction and execution trait for workflow system.
- tasks
- Built-in task implementations for common workflow operations.
- timeout
- Timeout configuration for tasks and workflows.
- tools
- Tool registry for external tool integration with process lifecycle management.
- validate
- Workflow validation before execution.
- yaml
- YAML workflow definition and parsing.