use crate::{McpServerConfig, agent::ToolSpec};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AgentFlow {
#[serde(default = "default_api_version")]
pub api_version: String,
#[serde(default = "default_agentflow_kind")]
pub kind: String,
pub metadata: AgentFlowMetadata,
pub spec: AgentFlowSpec,
}
fn default_api_version() -> String {
"aof.dev/v1".to_string()
}
fn default_agentflow_kind() -> String {
"AgentFlow".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentFlowMetadata {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub labels: HashMap<String, String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub annotations: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AgentFlowSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub nodes: Vec<FlowNode>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub connections: Vec<FlowConnection>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<FlowContext>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<FlowConfig>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FlowContext {
#[serde(skip_serializing_if = "Option::is_none")]
pub kubeconfig: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub env: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub working_dir: Option<String>,
#[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
pub extra: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FlowNode {
pub id: String,
#[serde(rename = "type")]
pub node_type: NodeType,
#[serde(default)]
pub config: NodeConfig,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub conditions: Vec<NodeCondition>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum NodeType {
Transform,
Agent,
Script,
Fleet,
Conditional,
Slack,
Discord,
HTTP,
Wait,
Parallel,
Join,
Approval,
End,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InlineAgentConfig {
pub name: String,
pub model: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub instructions: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tools: Vec<ToolSpec>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub mcp_servers: Vec<McpServerConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct ScriptConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub action: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub args: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub working_dir: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub env: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout_seconds: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parse: Option<ScriptOutputParse>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pattern: Option<String>,
#[serde(default = "default_true")]
pub fail_on_error: bool,
}
fn default_true() -> bool {
true
}
impl Default for ScriptConfig {
fn default() -> Self {
Self {
command: None,
tool: None,
action: None,
args: HashMap::new(),
working_dir: None,
env: HashMap::new(),
timeout_seconds: Some(60),
parse: None,
pattern: None,
fail_on_error: true,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ScriptOutputParse {
Text,
Json,
Lines,
Regex,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NodeConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub script: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub inline: Option<InlineAgentConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_config: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub input: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub context: HashMap<String, String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tools: Vec<ToolSpec>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub mcp_servers: Vec<McpServerConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub script_config: Option<ScriptConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub fleet: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub condition: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub thread_ts: Option<String>,
#[serde(default)]
pub wait_for_reaction: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout_seconds: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub blocks: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub method: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub headers: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub body: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub branches: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub strategy: Option<JoinStrategy>,
#[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
pub extra: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum JoinStrategy {
All,
Any,
Majority,
}
impl Default for JoinStrategy {
fn default() -> Self {
Self::All
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeCondition {
pub from: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reaction: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowConnection {
pub from: String,
pub to: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub when: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FlowConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub default_timeout_seconds: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<FlowRetryConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_handler: Option<String>,
#[serde(default)]
pub verbose: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FlowRetryConfig {
#[serde(default = "default_max_retries")]
pub max_attempts: u32,
#[serde(default = "default_retry_delay")]
pub initial_delay: String,
#[serde(default = "default_backoff_multiplier")]
pub backoff_multiplier: f64,
}
fn default_max_retries() -> u32 {
3
}
fn default_retry_delay() -> String {
"1s".to_string()
}
fn default_backoff_multiplier() -> f64 {
2.0
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentFlowState {
pub run_id: String,
pub flow_name: String,
pub current_nodes: Vec<String>,
pub status: FlowExecutionStatus,
pub node_results: HashMap<String, NodeResult>,
pub variables: HashMap<String, serde_json::Value>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<FlowError>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum FlowExecutionStatus {
Pending,
Running,
Waiting,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeResult {
pub node_id: String,
pub status: NodeExecutionStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<serde_json::Value>,
pub started_at: chrono::DateTime<chrono::Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NodeExecutionStatus {
Pending,
Running,
Waiting,
Completed,
Failed,
Skipped,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowError {
pub error_type: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<String>,
}
impl AgentFlow {
pub fn validate(&self) -> Result<(), String> {
if self.metadata.name.is_empty() {
return Err("Flow name is required".to_string());
}
if self.spec.nodes.is_empty() {
return Err("At least one node is required".to_string());
}
let node_ids: std::collections::HashSet<&str> =
self.spec.nodes.iter().map(|n| n.id.as_str()).collect();
if node_ids.len() != self.spec.nodes.len() {
return Err("Duplicate node IDs found".to_string());
}
for conn in &self.spec.connections {
if conn.from != "start" && !node_ids.contains(conn.from.as_str()) {
return Err(format!("Connection references unknown node: {}", conn.from));
}
if !node_ids.contains(conn.to.as_str()) {
return Err(format!("Connection references unknown node: {}", conn.to));
}
}
for node in &self.spec.nodes {
match node.node_type {
NodeType::Agent => {
if node.config.agent.is_none() && node.config.inline.is_none() {
return Err(format!(
"Agent node '{}' requires either 'agent' (reference) or 'inline' (embedded config)",
node.id
));
}
}
NodeType::Script => {
if let Some(ref cfg) = node.config.script_config {
if cfg.command.is_none() && cfg.tool.is_none() {
return Err(format!(
"Script node '{}' requires either 'command' or 'tool' in script_config",
node.id
));
}
} else {
return Err(format!(
"Script node '{}' requires 'script_config'",
node.id
));
}
}
NodeType::Fleet => {
if node.config.fleet.is_none() {
return Err(format!(
"Fleet node '{}' requires 'fleet' config",
node.id
));
}
}
NodeType::Conditional => {
if node.config.condition.is_none() {
return Err(format!(
"Conditional node '{}' requires 'condition' config",
node.id
));
}
}
NodeType::Slack | NodeType::Discord => {
if node.config.channel.is_none() && node.config.message.is_none() {
}
}
_ => {}
}
}
Ok(())
}
pub fn entry_nodes(&self) -> Vec<&FlowNode> {
let entry_ids: std::collections::HashSet<&str> = self
.spec
.connections
.iter()
.filter(|c| c.from == "start")
.map(|c| c.to.as_str())
.collect();
self.spec
.nodes
.iter()
.filter(|n| entry_ids.contains(n.id.as_str()))
.collect()
}
pub fn successors(&self, node_id: &str) -> Vec<(&FlowNode, Option<&str>)> {
let node_map: HashMap<&str, &FlowNode> =
self.spec.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
self.spec
.connections
.iter()
.filter(|c| c.from == node_id)
.filter_map(|c| {
node_map
.get(c.to.as_str())
.map(|n| (*n, c.when.as_deref()))
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_agentflow() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: AgentFlow
metadata:
name: deploy-flow
spec:
description: "Deployment workflow with approval"
nodes:
- id: validate
type: Agent
config:
agent: validator
input: ${input}
- id: deploy
type: Agent
config:
agent: deployer
connections:
- from: start
to: validate
- from: validate
to: deploy
"#;
let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
assert_eq!(flow.metadata.name, "deploy-flow");
assert_eq!(flow.spec.description, Some("Deployment workflow with approval".to_string()));
assert_eq!(flow.spec.nodes.len(), 2);
assert_eq!(flow.spec.connections.len(), 2);
assert!(flow.validate().is_ok());
}
#[test]
fn test_entry_nodes() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: AgentFlow
metadata:
name: test-flow
spec:
nodes:
- id: entry1
type: Transform
- id: entry2
type: Agent
config:
agent: test
- id: other
type: End
connections:
- from: start
to: entry1
- from: start
to: entry2
- from: entry1
to: other
- from: entry2
to: other
"#;
let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
let entries = flow.entry_nodes();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_validation_errors() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: AgentFlow
metadata:
name: bad-flow
spec:
nodes: []
connections: []
"#;
let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
assert!(flow.validate().is_err());
let yaml2 = r#"
apiVersion: aof.dev/v1
kind: AgentFlow
metadata:
name: bad-flow
spec:
nodes:
- id: agent
type: Agent
connections:
- from: start
to: agent
"#;
let flow2: AgentFlow = serde_yaml::from_str(yaml2).unwrap();
assert!(flow2.validate().is_err());
}
#[test]
fn test_conditional_flow() {
let yaml = r#"
apiVersion: aof.dev/v1
kind: AgentFlow
metadata:
name: conditional-flow
spec:
nodes:
- id: check
type: Conditional
config:
condition: ${requires_approval} == true
- id: approve
type: Approval
config:
message: "Approval needed"
- id: execute
type: Agent
config:
agent: executor
connections:
- from: start
to: check
- from: check
to: approve
when: requires_approval == true
- from: check
to: execute
when: requires_approval == false
- from: approve
to: execute
"#;
let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
assert!(flow.validate().is_ok());
let successors = flow.successors("check");
assert_eq!(successors.len(), 2);
}
}