use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Workflow {
#[serde(default = "default_api_version")]
pub api_version: String,
#[serde(default = "default_workflow_kind")]
pub kind: String,
pub metadata: WorkflowMetadata,
pub spec: WorkflowSpec,
}
fn default_api_version() -> String {
"aof.dev/v1".to_string()
}
fn default_workflow_kind() -> String {
"Workflow".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowMetadata {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub labels: HashMap<String, String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub annotations: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkflowSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<StateSchema>,
pub entrypoint: String,
pub steps: Vec<WorkflowStep>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub reducers: HashMap<String, StateReducer>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_handler: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<RetryConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub checkpointing: Option<CheckpointConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub recovery: Option<RecoveryConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub fleet: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateSchema {
#[serde(rename = "type")]
pub schema_type: String,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, PropertySchema>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub required: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PropertySchema {
#[serde(rename = "type")]
pub prop_type: String,
#[serde(rename = "enum", skip_serializing_if = "Option::is_none")]
pub enum_values: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub items: Option<Box<PropertySchema>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub default: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub struct StateReducer {
#[serde(rename = "type")]
pub reducer_type: ReducerType,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ReducerType {
Append,
Merge,
Sum,
Replace,
}
impl Default for ReducerType {
fn default() -> Self {
Self::Replace
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkflowStep {
pub name: String,
#[serde(rename = "type")]
pub step_type: StepType,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<StepConfig>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub validation: Vec<ValidationRule>,
#[serde(skip_serializing_if = "Option::is_none")]
pub next: Option<NextStep>,
#[serde(default)]
pub parallel: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub branches: Option<Vec<ParallelBranch>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub join: Option<JoinConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub on_error: Option<Vec<ConditionalNext>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub interrupt: Option<InterruptConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<TerminalStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum StepType {
Agent,
Approval,
Validation,
Parallel,
Join,
Terminal,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StepConfig {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub approvers: Vec<Approver>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub required_approvals: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub auto_approve: Option<AutoApproveConfig>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub validators: Vec<Validator>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_retries: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub on_failure: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Approver {
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoApproveConfig {
pub condition: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Validator {
#[serde(rename = "type")]
pub validator_type: ValidatorType,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub args: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ValidatorType {
Function,
Llm,
Script,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationRule {
#[serde(rename = "type")]
pub rule_type: ValidatorType,
#[serde(skip_serializing_if = "Option::is_none")]
pub script: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum NextStep {
Simple(String),
Conditional(Vec<ConditionalNext>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConditionalNext {
#[serde(skip_serializing_if = "Option::is_none")]
pub condition: Option<String>,
pub target: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParallelBranch {
pub name: String,
pub steps: Vec<BranchStep>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BranchStep {
#[serde(skip_serializing_if = "Option::is_none")]
pub agent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JoinConfig {
pub strategy: JoinStrategy,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum JoinStrategy {
All,
Any,
Majority,
}
impl Default for JoinStrategy {
fn default() -> Self {
Self::All
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterruptConfig {
#[serde(rename = "type")]
pub interrupt_type: InterruptType,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<StateSchema>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum InterruptType {
Input,
Confirm,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum TerminalStatus {
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RetryConfig {
#[serde(default = "default_max_attempts")]
pub max_attempts: u32,
#[serde(default)]
pub backoff: BackoffStrategy,
#[serde(default = "default_initial_delay")]
pub initial_delay: String,
#[serde(default = "default_max_delay")]
pub max_delay: String,
}
fn default_max_attempts() -> u32 {
3
}
fn default_initial_delay() -> String {
"1s".to_string()
}
fn default_max_delay() -> String {
"30s".to_string()
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum BackoffStrategy {
Fixed,
Linear,
#[default]
Exponential,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub backend: CheckpointBackend,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
#[serde(default)]
pub frequency: CheckpointFrequency,
#[serde(default = "default_history")]
pub history: u32,
}
fn default_true() -> bool {
true
}
fn default_history() -> u32 {
10
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum CheckpointBackend {
#[default]
File,
Redis,
Postgres,
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum CheckpointFrequency {
#[default]
Step,
Change,
Interval,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RecoveryConfig {
#[serde(default = "default_true")]
pub auto_resume: bool,
#[serde(default = "default_true")]
pub skip_completed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowState {
pub run_id: String,
pub workflow_name: String,
pub current_step: String,
pub status: WorkflowStatus,
pub data: serde_json::Value,
pub completed_steps: Vec<String>,
pub step_results: HashMap<String, StepResult>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<WorkflowError>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum WorkflowStatus {
Pending,
Running,
WaitingApproval,
WaitingInput,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepResult {
pub step_name: String,
pub status: StepStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<serde_json::Value>,
pub started_at: chrono::DateTime<chrono::Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum StepStatus {
Pending,
Running,
Completed,
Failed,
Skipped,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowError {
pub error_type: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub step: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FlatWorkflowConfig {
pub name: String,
pub entrypoint: String,
pub steps: Vec<WorkflowStep>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<StateSchema>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub reducers: HashMap<String, StateReducer>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_handler: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<RetryConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub checkpointing: Option<CheckpointConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum WorkflowConfigInput {
Kubernetes(Workflow),
Flat(FlatWorkflowConfig),
}
impl From<WorkflowConfigInput> for Workflow {
fn from(input: WorkflowConfigInput) -> Self {
match input {
WorkflowConfigInput::Kubernetes(w) => w,
WorkflowConfigInput::Flat(flat) => Workflow {
api_version: default_api_version(),
kind: default_workflow_kind(),
metadata: WorkflowMetadata {
name: flat.name,
namespace: None,
labels: HashMap::new(),
annotations: HashMap::new(),
},
spec: WorkflowSpec {
state: flat.state,
entrypoint: flat.entrypoint,
steps: flat.steps,
reducers: flat.reducers,
error_handler: flat.error_handler,
retry: flat.retry,
checkpointing: flat.checkpointing,
recovery: None,
fleet: None,
},
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_kubernetes_workflow() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: Workflow
metadata:
name: test-workflow
labels:
category: test
spec:
entrypoint: start
steps:
- name: start
type: agent
agent: test-agent
next: end
- name: end
type: terminal
status: completed
"#;
let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
assert_eq!(workflow.metadata.name, "test-workflow");
assert_eq!(workflow.spec.entrypoint, "start");
assert_eq!(workflow.spec.steps.len(), 2);
}
#[test]
fn test_parse_flat_workflow() {
let yaml = r#"
name: simple-workflow
entrypoint: step1
steps:
- name: step1
type: agent
agent: my-agent
next: step2
- name: step2
type: terminal
status: completed
"#;
let input: WorkflowConfigInput = serde_yaml::from_str(yaml).unwrap();
let workflow: Workflow = input.into();
assert_eq!(workflow.metadata.name, "simple-workflow");
assert_eq!(workflow.spec.steps.len(), 2);
}
#[test]
fn test_conditional_routing() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: Workflow
metadata:
name: conditional-workflow
spec:
entrypoint: check
steps:
- name: check
type: agent
agent: checker
next:
- condition: "state.score > 0.8"
target: high
- condition: "state.score > 0.5"
target: medium
- target: low
- name: high
type: terminal
status: completed
- name: medium
type: terminal
status: completed
- name: low
type: terminal
status: completed
"#;
let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
let check_step = &workflow.spec.steps[0];
match &check_step.next {
Some(NextStep::Conditional(conds)) => {
assert_eq!(conds.len(), 3);
assert_eq!(conds[0].condition.as_ref().unwrap(), "state.score > 0.8");
assert_eq!(conds[0].target, "high");
assert!(conds[2].condition.is_none()); }
_ => panic!("Expected conditional next"),
}
}
#[test]
fn test_parallel_execution() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: Workflow
metadata:
name: parallel-workflow
spec:
entrypoint: analyze
steps:
- name: analyze
type: parallel
branches:
- name: logs
steps:
- agent: log-analyzer
- name: metrics
steps:
- agent: metric-analyzer
join:
strategy: all
timeout: 10m
next: aggregate
- name: aggregate
type: terminal
status: completed
"#;
let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
let parallel_step = &workflow.spec.steps[0];
assert_eq!(parallel_step.step_type, StepType::Parallel);
let branches = parallel_step.branches.as_ref().unwrap();
assert_eq!(branches.len(), 2);
let join = parallel_step.join.as_ref().unwrap();
assert_eq!(join.strategy, JoinStrategy::All);
}
#[test]
fn test_approval_step() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: Workflow
metadata:
name: approval-workflow
spec:
entrypoint: deploy-approval
steps:
- name: deploy-approval
type: approval
config:
approvers:
- role: sre-team
- user: admin@example.com
timeout: 30m
requiredApprovals: 2
autoApprove:
condition: "state.environment == 'dev'"
next:
- condition: approved
target: deploy
- condition: rejected
target: notify
- condition: timeout
target: escalate
- name: deploy
type: terminal
status: completed
- name: notify
type: terminal
status: completed
- name: escalate
type: terminal
status: completed
"#;
let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
let approval_step = &workflow.spec.steps[0];
assert_eq!(approval_step.step_type, StepType::Approval);
let config = approval_step.config.as_ref().unwrap();
assert_eq!(config.approvers.len(), 2);
assert_eq!(config.required_approvals, Some(2));
}
#[test]
fn test_workflow_state_serialization() {
let state = WorkflowState {
run_id: "run-123".to_string(),
workflow_name: "test".to_string(),
current_step: "step1".to_string(),
status: WorkflowStatus::Running,
data: serde_json::json!({"key": "value"}),
completed_steps: vec!["step0".to_string()],
step_results: HashMap::new(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
error: None,
};
let json = serde_json::to_string(&state).unwrap();
let parsed: WorkflowState = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.run_id, "run-123");
assert_eq!(parsed.status, WorkflowStatus::Running);
}
}