# RustChain Mission Execution Engine Specification
## Overview
The RustChain Mission Execution Engine is a sophisticated DAG (Directed Acyclic Graph) execution system that processes complex AI workflows with dependency management, parallel execution, safety validation, and comprehensive error handling. It supports 40+ step types across file operations, AI/ML, networking, databases, and system management.
## Version
- **Engine Version**: 1.0.0
- **RustChain Version**: 0.1.0+
- **Specification**: GitHub Spec Kit compliant
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ RustChain Mission Execution Engine │
├─────────────────────────────────────────────────────────────┤
│ Mission (YAML) → MissionLoader → Safety Validation │
│ ↓ │
│ DagExecutor │
│ ├── Topological Sort (Dependency Resolution) │
│ ├── Parallel Execution (Async/Await) │
│ ├── Step Execution (40+ Step Types) │
│ ├── Error Handling (Fail-fast / Continue-on-error) │
│ ├── Timeout Management (Per-step & Global) │
│ └── Result Collection (Comprehensive Reporting) │
├─────────────────────────────────────────────────────────────┤
│ ExecutionContext │
│ ├── Variable Storage & Substitution │
│ ├── Inter-step Communication │
│ ├── State Management │
│ └── Resource Tracking │
└─────────────────────────────────────────────────────────────┘
```
## Core Components
### 1. Mission Structure
The fundamental unit of execution defining a complete workflow.
```rust
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct Mission {
pub version: String,
pub name: String,
pub description: Option<String>,
pub steps: Vec<MissionStep>,
pub config: Option<MissionConfig>,
}
```
#### Fields
- **`version: String`** - Mission format version (e.g., "1.0")
- **`name: String`** - Human-readable mission name
- **`description: Option<String>`** - Optional mission description
- **`steps: Vec<MissionStep>`** - Ordered list of execution steps
- **`config: Option<MissionConfig>`** - Global mission configuration
#### Example Mission
```yaml
name: "Data Processing Pipeline"
version: "1.0"
description: "Extract, transform, and analyze sales data"
config:
timeout_seconds: 600
fail_fast: true
steps:
- id: "extract_data"
name: "Extract Sales Data"
step_type: "sql_query"
parameters:
connection: "sales_db"
query: "SELECT * FROM sales WHERE date >= '2024-01-01'"
- id: "transform_data"
name: "Transform Data Format"
step_type: "csv_process"
depends_on: ["extract_data"]
parameters:
input_file: "sales_raw.csv"
output_file: "sales_clean.csv"
operations: ["normalize", "validate"]
- id: "analyze_trends"
name: "AI Trend Analysis"
step_type: "llm"
depends_on: ["transform_data"]
parameters:
provider: "openai"
model: "gpt-4"
prompt: "Analyze sales trends in the provided data"
```
### 2. Mission Step
Individual execution unit within a mission workflow.
```rust
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct MissionStep {
pub id: String,
pub name: String,
pub step_type: StepType,
pub depends_on: Option<Vec<String>>,
pub timeout_seconds: Option<u64>,
pub continue_on_error: Option<bool>,
pub parameters: serde_json::Value,
}
```
#### Fields
- **`id: String`** - Unique step identifier for dependency references
- **`name: String`** - Human-readable step description
- **`step_type: StepType`** - Type of operation to perform
- **`depends_on: Option<Vec<String>>`** - Dependencies that must complete first
- **`timeout_seconds: Option<u64>`** - Step-specific timeout override
- **`continue_on_error: Option<bool>`** - Whether to continue if this step fails
- **`parameters: serde_json::Value`** - Step-specific configuration parameters
### 3. Step Types
Comprehensive catalog of 40+ supported operations.
```rust
pub enum StepType {
// File Operations
CreateFile, EditFile, DeleteFile, CopyFile, MoveFile,
ReadFile, ListDirectory, FileSearch,
// Data Processing
ParseJson, ParseYaml, ParseXml, ValidateSchema, CsvProcess,
// Code Development
CompileCode, RunTests, FormatCode, LintCode,
ExtractFunctions, GenerateDocs,
// Git Operations
GitCommit, GitBranch, GitMerge, GitStatus, GitDiff,
// System Operations
ProcessStart, ProcessKill, MonitorResources,
ServiceHealth, Compress,
// Database Operations
SqlQuery, RedisSet, RedisGet, DbBackup, DbMigrate,
// Network Operations
WebsocketConnect, FtpUpload, FtpDownload,
SshExecute, PingHost,
// AI/ML Operations
GenerateEmbedding, SimilaritySearch, ModelInference,
// Control Flow
Noop, Command, Http,
// Advanced Features
Llm, Tool, Agent, Chain, RagQuery, RagAdd,
}
```
#### Step Type Categories
##### File Operations
- **CreateFile**: Create new files with content
- **EditFile**: Modify existing files (replace/append)
- **DeleteFile**: Remove files safely
- **CopyFile**: Copy files with validation
- **MoveFile**: Move/rename files
- **ReadFile**: Read file contents into variables
- **ListDirectory**: Directory enumeration and filtering
- **FileSearch**: Content-based file searching
##### Data Processing
- **ParseJson**: JSON parsing and validation
- **ParseYaml**: YAML parsing with schema support
- **ParseXml**: XML processing and transformation
- **ValidateSchema**: Schema validation (JSON Schema, XSD)
- **CsvProcess**: CSV manipulation and analysis
##### Code Development
- **CompileCode**: Multi-language compilation
- **RunTests**: Test suite execution
- **FormatCode**: Code formatting and style
- **LintCode**: Static analysis and linting
- **ExtractFunctions**: Code analysis and extraction
- **GenerateDocs**: Documentation generation
##### System Operations
- **Command**: Shell command execution
- **ProcessStart**: Process lifecycle management
- **ProcessKill**: Safe process termination
- **MonitorResources**: System monitoring
- **ServiceHealth**: Health check automation
##### AI/ML Operations
- **Llm**: Large Language Model interactions
- **GenerateEmbedding**: Vector embedding generation
- **SimilaritySearch**: Semantic similarity matching
- **ModelInference**: ML model predictions
- **Agent**: Autonomous agent execution
- **Chain**: Multi-step AI workflows
### 4. DagExecutor
The core execution engine implementing DAG-based workflow processing.
```rust
pub struct DagExecutor;
impl DagExecutor {
pub async fn execute_mission(mission: Mission) -> anyhow::Result<MissionResult>;
pub async fn execute_step(step: &MissionStep, context: &mut ExecutionContext) -> anyhow::Result<StepResult>;
pub fn topological_sort(steps: &[MissionStep]) -> anyhow::Result<Vec<String>>;
}
```
#### Core Methods
##### `execute_mission(mission: Mission) -> Result<MissionResult>`
Executes a complete mission with full dependency resolution and error handling.
**Execution Flow**:
1. **Validation** - Check mission structure and dependencies
2. **Topological Sort** - Resolve execution order
3. **Parallel Execution** - Execute independent steps concurrently
4. **Error Handling** - Process failures according to fail_fast settings
5. **Result Collection** - Aggregate all step results
**Features**:
- ✅ **Dependency Resolution**: Automatic topological sorting
- ✅ **Parallel Execution**: Concurrent execution of independent steps
- ✅ **Timeout Management**: Global and per-step timeouts
- ✅ **Error Strategies**: Fail-fast or continue-on-error modes
- ✅ **Progress Tracking**: Real-time execution monitoring
##### `execute_step(step: &MissionStep, context: &mut ExecutionContext) -> Result<StepResult>`
Executes a single step with comprehensive error handling and logging.
**Step Execution Process**:
1. **Parameter Validation** - Validate required parameters
2. **Context Preparation** - Set up execution environment
3. **Operation Execution** - Perform the specific step operation
4. **Result Processing** - Process outputs and handle errors
5. **Context Update** - Store results for subsequent steps
##### `topological_sort(steps: &[MissionStep]) -> Result<Vec<String>>`
Resolves step dependencies into execution order.
**Dependency Resolution**:
- Detects circular dependencies
- Ensures dependencies execute before dependents
- Optimizes for parallel execution opportunities
- Validates dependency references
### 5. ExecutionContext
Manages state and communication between steps during mission execution.
```rust
pub struct ExecutionContext {
variables: HashMap<String, String>,
}
impl ExecutionContext {
pub fn new() -> Self;
pub fn get_variable(&self, key: &str) -> Option<&String>;
pub fn set_variable(&mut self, key: String, value: String);
}
```
#### Features
- **Variable Storage**: Inter-step data sharing
- **State Management**: Execution state tracking
- **Resource Tracking**: Memory and resource monitoring
- **Communication**: Step-to-step message passing
### 6. MissionResult
Comprehensive execution result with detailed step-by-step reporting.
```rust
pub struct MissionResult {
pub status: MissionStatus,
pub step_results: HashMap<String, StepResult>,
pub total_duration_ms: u64,
pub timestamp: DateTime<Utc>,
}
pub enum MissionStatus {
Running,
Completed,
Failed,
Cancelled,
}
```
### 7. StepResult
Individual step execution result with detailed information.
```rust
pub struct StepResult {
pub step_id: String,
pub status: StepStatus,
pub output: serde_json::Value,
pub error: Option<String>,
pub duration_ms: u64,
}
pub enum StepStatus {
Success,
Failed,
Skipped,
}
```
## Mission Configuration
Global configuration options for mission execution.
```rust
pub struct MissionConfig {
pub timeout_seconds: Option<u64>,
pub fail_fast: Option<bool>,
pub max_concurrent_steps: Option<usize>,
pub retry_attempts: Option<u32>,
pub retry_delay_ms: Option<u64>,
}
```
### Configuration Options
- **`timeout_seconds`** - Global timeout for mission execution
- **`fail_fast`** - Stop execution on first step failure (default: true)
- **`max_concurrent_steps`** - Limit parallel step execution
- **`retry_attempts`** - Number of retry attempts for failed steps
- **`retry_delay_ms`** - Delay between retry attempts
## Execution Examples
### Basic Linear Workflow
```yaml
name: "File Processing Pipeline"
version: "1.0"
steps:
- id: "create_input"
name: "Create Input File"
step_type: "create_file"
parameters:
path: "input.txt"
content: "Hello, World!"
- id: "process_file"
name: "Process File Content"
step_type: "edit_file"
depends_on: ["create_input"]
parameters:
path: "input.txt"
content: "Processed: Hello, World!"
- id: "validate_output"
name: "Validate Processing"
step_type: "read_file"
depends_on: ["process_file"]
parameters:
path: "input.txt"
```
### Parallel Execution Workflow
```yaml
name: "Parallel Data Analysis"
version: "1.0"
config:
max_concurrent_steps: 3
steps:
- id: "data_source"
name: "Generate Source Data"
step_type: "create_file"
parameters:
path: "data.csv"
content: "name,value\ntest,100"
# These three steps execute in parallel
- id: "analysis_a"
name: "Statistical Analysis"
step_type: "csv_process"
depends_on: ["data_source"]
parameters:
input_file: "data.csv"
operation: "statistics"
- id: "analysis_b"
name: "Trend Analysis"
step_type: "llm"
depends_on: ["data_source"]
parameters:
provider: "openai"
prompt: "Analyze trends in the CSV data"
- id: "analysis_c"
name: "Data Validation"
step_type: "validate_schema"
depends_on: ["data_source"]
parameters:
file: "data.csv"
schema_type: "csv"
# Final step waits for all analysis to complete
- id: "combine_results"
name: "Combine Analysis Results"
step_type: "create_file"
depends_on: ["analysis_a", "analysis_b", "analysis_c"]
parameters:
path: "final_report.md"
content: "# Analysis Complete\nAll analysis steps finished."
```
### Error Handling Workflow
```yaml
name: "Robust Error Handling"
version: "1.0"
config:
fail_fast: false
retry_attempts: 3
retry_delay_ms: 1000
steps:
- id: "risky_operation"
name: "Operation That Might Fail"
step_type: "command"
continue_on_error: true
parameters:
command: "test"
args: ["-f", "nonexistent_file.txt"]
- id: "fallback_operation"
name: "Fallback When Primary Fails"
step_type: "create_file"
depends_on: ["risky_operation"]
parameters:
path: "fallback.txt"
content: "Fallback executed"
- id: "always_execute"
name: "Critical Final Step"
step_type: "create_file"
depends_on: ["fallback_operation"]
parameters:
path: "completion.log"
content: "Mission attempted completion"
```
### AI/ML Workflow
```yaml
name: "AI Document Processing"
version: "1.0"
steps:
- id: "load_document"
name: "Load Source Document"
step_type: "read_file"
parameters:
path: "source_document.txt"
- id: "generate_summary"
name: "AI Summarization"
step_type: "llm"
depends_on: ["load_document"]
parameters:
provider: "openai"
model: "gpt-4"
prompt: "Summarize the document content"
max_tokens: 500
- id: "extract_keywords"
name: "Keyword Extraction"
step_type: "llm"
depends_on: ["load_document"]
parameters:
provider: "openai"
model: "gpt-3.5-turbo"
prompt: "Extract key terms and topics"
- id: "generate_embeddings"
name: "Vector Embeddings"
step_type: "generate_embedding"
depends_on: ["load_document"]
parameters:
text_source: "file"
file_path: "source_document.txt"
model: "text-embedding-ada-002"
- id: "store_in_rag"
name: "Store in RAG System"
step_type: "rag_add"
depends_on: ["generate_embeddings", "generate_summary"]
parameters:
id: "doc_001"
content_file: "source_document.txt"
metadata: {
"summary": "{{summary_result}}",
"keywords": "{{keywords_result}}"
}
```
## Step Parameter Examples
### File Operations
```yaml
# Create File
step_type: "create_file"
parameters:
path: "output.txt"
content: "File content here"
mode: "644" # Optional file permissions
# Edit File
step_type: "edit_file"
parameters:
path: "existing.txt"
content: "New content"
append: false # Replace vs append
backup: true # Create backup before editing
```
### Database Operations
```yaml
# SQL Query
step_type: "sql_query"
parameters:
connection: "postgresql://user:pass@host:5432/db"
query: "SELECT * FROM users WHERE created_at >= $1"
parameters: ["2024-01-01"]
output_format: "json"
# Redis Operations
step_type: "redis_set"
parameters:
connection: "redis://localhost:6379"
key: "session:user:123"
value: '{"user_id": 123, "active": true}'
ttl: 3600
```
### AI/ML Operations
```yaml
# LLM Interaction
step_type: "llm"
parameters:
provider: "openai"
model: "gpt-4"
prompt: "Analyze the provided data"
temperature: 0.7
max_tokens: 1000
system_message: "You are a data analyst"
# Agent Execution
step_type: "agent"
parameters:
name: "DataAnalyst"
objective: "Analyze quarterly sales trends"
max_iterations: 10
verbose: true
tools: ["calculator", "data_processor"]
```
### Network Operations
```yaml
# HTTP Request
step_type: "http"
parameters:
url: "https://api.example.com/data"
method: "POST"
headers:
"Content-Type": "application/json"
"Authorization": "Bearer {{api_token}}"
body: '{"query": "recent_sales"}'
timeout: 30
# SSH Execution
step_type: "ssh_execute"
parameters:
host: "remote.example.com"
username: "deploy"
command: "systemctl restart web-service"
timeout: 60
```
## Variable Substitution
The execution engine supports dynamic variable substitution using `{{variable_name}}` syntax.
### Variable Sources
1. **Environment Variables** - `{{env.API_KEY}}`
2. **Previous Step Results** - `{{step_id.output.field}}`
3. **Context Variables** - `{{context.variable_name}}`
4. **Configuration Values** - `{{config.setting_name}}`
### Example Usage
```yaml
steps:
- id: "get_user_data"
step_type: "http"
parameters:
url: "https://api.example.com/users/{{env.USER_ID}}"
headers:
"Authorization": "Bearer {{env.API_TOKEN}}"
- id: "process_data"
step_type: "llm"
depends_on: ["get_user_data"]
parameters:
prompt: "Analyze user data: {{get_user_data.output.response}}"
- id: "save_result"
step_type: "create_file"
depends_on: ["process_data"]
parameters:
path: "user_analysis_{{env.USER_ID}}.txt"
content: "{{process_data.output.analysis}}"
```
## Error Handling Strategies
### Fail-Fast Mode (Default)
```yaml
config:
fail_fast: true # Stop on first error
steps:
- id: "step1"
step_type: "command"
parameters:
command: "test_command"
- id: "step2" # Won't execute if step1 fails
depends_on: ["step1"]
step_type: "create_file"
```
### Continue-on-Error Mode
```yaml
config:
fail_fast: false # Continue despite errors
steps:
- id: "risky_step"
step_type: "command"
continue_on_error: true # Step-level override
parameters:
command: "might_fail"
- id: "cleanup_step"
depends_on: ["risky_step"]
step_type: "create_file" # Executes even if risky_step fails
```
### Retry Logic
```yaml
config:
retry_attempts: 3
retry_delay_ms: 2000
steps:
- id: "network_request"
step_type: "http"
parameters:
url: "https://unreliable-api.com/data"
# Will retry up to 3 times with 2-second delays
```
## Performance Characteristics
### Execution Metrics
- **Mission Load Time**: ~10ms (typical YAML parsing)
- **Dependency Resolution**: ~5ms (100 steps)
- **Step Execution**: Varies by step type
- File operations: 1-50ms
- Command execution: 10ms-30s
- LLM calls: 500ms-10s
- Database queries: 10ms-5s
- **Parallel Efficiency**: Near-linear scaling for independent steps
- **Memory Usage**: ~5MB base + step-specific requirements
### Scalability
- **Maximum Steps**: 10,000+ steps per mission
- **Dependency Depth**: Unlimited (limited by memory)
- **Parallel Execution**: Up to system CPU core count
- **Mission Size**: 100MB+ YAML files supported
- **Concurrent Missions**: Limited by system resources
## Safety and Security
### Built-in Safety Features
1. **Dependency Validation** - Prevents circular dependencies
2. **Timeout Protection** - Prevents runaway execution
3. **Resource Limits** - Memory and CPU protection
4. **Path Sanitization** - Prevents path traversal attacks
5. **Command Validation** - Safe command execution
6. **Error Containment** - Isolated step failures
### Security Considerations
1. **Input Validation** - All parameters validated
2. **Sandboxed Execution** - Optional isolation
3. **Audit Logging** - Complete execution trails
4. **Permission Checking** - File and system access controls
5. **Credential Management** - Secure secret handling
## Integration Points
### CLI Integration
```bash
# Direct mission execution
rustchain run mission.yaml
# Validation only
rustchain run mission.yaml --dry-run
# Safety analysis
rustchain safety validate mission.yaml
```
### Library Integration
```rust
use rustchain::engine::{DagExecutor, MissionLoader};
// Load and execute mission
let mission = MissionLoader::load_from_file("mission.yaml")?;
let result = DagExecutor::execute_mission(mission).await?;
println!("Mission completed: {:?}", result.status);
```
### API Integration
```rust
// RESTful API endpoint
#[post("/missions/execute")]
async fn execute_mission(mission: Json<Mission>) -> Json<MissionResult> {
let result = DagExecutor::execute_mission(mission.into_inner()).await?;
Json(result)
}
```
## Best Practices
### Mission Design
1. **Atomic Steps** - Keep steps focused and atomic
2. **Clear Dependencies** - Explicit dependency declarations
3. **Error Handling** - Plan for failure scenarios
4. **Resource Management** - Consider memory and time limits
5. **Documentation** - Clear step names and descriptions
### Performance Optimization
1. **Parallel Execution** - Minimize unnecessary dependencies
2. **Resource Allocation** - Appropriate timeouts and limits
3. **Step Granularity** - Balance between atomic and efficient
4. **Caching** - Reuse expensive computations
5. **Monitoring** - Track execution metrics
### Security Best Practices
1. **Input Validation** - Validate all external inputs
2. **Least Privilege** - Minimal required permissions
3. **Audit Trails** - Enable comprehensive logging
4. **Secret Management** - Use secure credential storage
5. **Network Security** - Validate external connections
---
*This specification follows the GitHub Spec Kit standards for execution engine documentation and is maintained alongside the RustChain codebase.*