1use crate::{McpServerConfig, agent::ToolSpec};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub struct AgentFlow {
53 #[serde(default = "default_api_version")]
55 pub api_version: String,
56
57 #[serde(default = "default_agentflow_kind")]
59 pub kind: String,
60
61 pub metadata: AgentFlowMetadata,
63
64 pub spec: AgentFlowSpec,
66}
67
68fn default_api_version() -> String {
69 "aof.dev/v1".to_string()
70}
71
72fn default_agentflow_kind() -> String {
73 "AgentFlow".to_string()
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct AgentFlowMetadata {
79 pub name: String,
81
82 #[serde(skip_serializing_if = "Option::is_none")]
84 pub namespace: Option<String>,
85
86 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
88 pub labels: HashMap<String, String>,
89
90 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
92 pub annotations: HashMap<String, String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97#[serde(rename_all = "camelCase")]
98pub struct AgentFlowSpec {
99 #[serde(skip_serializing_if = "Option::is_none")]
101 pub description: Option<String>,
102
103 pub nodes: Vec<FlowNode>,
105
106 #[serde(default, skip_serializing_if = "Vec::is_empty")]
108 pub connections: Vec<FlowConnection>,
109
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub context: Option<FlowContext>,
113
114 #[serde(skip_serializing_if = "Option::is_none")]
116 pub config: Option<FlowConfig>,
117}
118
119#[derive(Debug, Clone, Default, Serialize, Deserialize)]
121#[serde(rename_all = "camelCase")]
122pub struct FlowContext {
123 #[serde(skip_serializing_if = "Option::is_none")]
125 pub kubeconfig: Option<String>,
126
127 #[serde(skip_serializing_if = "Option::is_none")]
129 pub namespace: Option<String>,
130
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub cluster: Option<String>,
134
135 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
137 pub env: HashMap<String, String>,
138
139 #[serde(skip_serializing_if = "Option::is_none")]
141 pub working_dir: Option<String>,
142
143 #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
145 pub extra: HashMap<String, serde_json::Value>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150#[serde(rename_all = "camelCase")]
151pub struct FlowNode {
152 pub id: String,
154
155 #[serde(rename = "type")]
157 pub node_type: NodeType,
158
159 #[serde(default)]
161 pub config: NodeConfig,
162
163 #[serde(default, skip_serializing_if = "Vec::is_empty")]
165 pub conditions: Vec<NodeCondition>,
166}
167
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
170pub enum NodeType {
171 Transform,
173 Agent,
175 Script,
177 Fleet,
179 Conditional,
181 Slack,
183 Discord,
185 HTTP,
187 Wait,
189 Parallel,
191 Join,
193 Approval,
195 End,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub struct InlineAgentConfig {
204 pub name: String,
206
207 pub model: String,
209
210 #[serde(skip_serializing_if = "Option::is_none")]
212 pub instructions: Option<String>,
213
214 #[serde(default, skip_serializing_if = "Vec::is_empty")]
216 pub tools: Vec<ToolSpec>,
217
218 #[serde(default, skip_serializing_if = "Vec::is_empty")]
220 pub mcp_servers: Vec<McpServerConfig>,
221
222 #[serde(skip_serializing_if = "Option::is_none")]
224 pub temperature: Option<f32>,
225
226 #[serde(skip_serializing_if = "Option::is_none")]
228 pub max_tokens: Option<usize>,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub struct ScriptConfig {
236 #[serde(skip_serializing_if = "Option::is_none")]
239 pub command: Option<String>,
240
241 #[serde(skip_serializing_if = "Option::is_none")]
244 pub tool: Option<String>,
245
246 #[serde(skip_serializing_if = "Option::is_none")]
248 pub action: Option<String>,
249
250 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
252 pub args: HashMap<String, serde_json::Value>,
253
254 #[serde(skip_serializing_if = "Option::is_none")]
256 pub working_dir: Option<String>,
257
258 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
260 pub env: HashMap<String, String>,
261
262 #[serde(skip_serializing_if = "Option::is_none")]
264 pub timeout_seconds: Option<u32>,
265
266 #[serde(skip_serializing_if = "Option::is_none")]
268 pub parse: Option<ScriptOutputParse>,
269
270 #[serde(skip_serializing_if = "Option::is_none")]
272 pub pattern: Option<String>,
273
274 #[serde(default = "default_true")]
276 pub fail_on_error: bool,
277}
278
279fn default_true() -> bool {
280 true
281}
282
283impl Default for ScriptConfig {
284 fn default() -> Self {
285 Self {
286 command: None,
287 tool: None,
288 action: None,
289 args: HashMap::new(),
290 working_dir: None,
291 env: HashMap::new(),
292 timeout_seconds: Some(60),
293 parse: None,
294 pattern: None,
295 fail_on_error: true,
296 }
297 }
298}
299
300#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
302#[serde(rename_all = "lowercase")]
303pub enum ScriptOutputParse {
304 Text,
306 Json,
308 Lines,
310 Regex,
312}
313
314#[derive(Debug, Clone, Default, Serialize, Deserialize)]
316#[serde(rename_all = "camelCase")]
317pub struct NodeConfig {
318 #[serde(skip_serializing_if = "Option::is_none")]
321 pub script: Option<String>,
322
323 #[serde(skip_serializing_if = "Option::is_none")]
326 pub agent: Option<String>,
327
328 #[serde(skip_serializing_if = "Option::is_none")]
331 pub inline: Option<InlineAgentConfig>,
332
333 #[serde(skip_serializing_if = "Option::is_none")]
335 pub agent_config: Option<String>,
336
337 #[serde(skip_serializing_if = "Option::is_none")]
339 pub input: Option<String>,
340
341 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
343 pub context: HashMap<String, String>,
344
345 #[serde(default, skip_serializing_if = "Vec::is_empty")]
348 pub tools: Vec<ToolSpec>,
349
350 #[serde(default, skip_serializing_if = "Vec::is_empty")]
353 pub mcp_servers: Vec<McpServerConfig>,
354
355 #[serde(skip_serializing_if = "Option::is_none")]
359 pub script_config: Option<ScriptConfig>,
360
361 #[serde(skip_serializing_if = "Option::is_none")]
364 pub fleet: Option<String>,
365
366 #[serde(skip_serializing_if = "Option::is_none")]
369 pub condition: Option<String>,
370
371 #[serde(skip_serializing_if = "Option::is_none")]
374 pub channel: Option<String>,
375
376 #[serde(skip_serializing_if = "Option::is_none")]
378 pub message: Option<String>,
379
380 #[serde(skip_serializing_if = "Option::is_none")]
382 pub thread_ts: Option<String>,
383
384 #[serde(default)]
386 pub wait_for_reaction: bool,
387
388 #[serde(skip_serializing_if = "Option::is_none")]
390 pub timeout_seconds: Option<u32>,
391
392 #[serde(skip_serializing_if = "Option::is_none")]
394 pub blocks: Option<serde_json::Value>,
395
396 #[serde(skip_serializing_if = "Option::is_none")]
399 pub url: Option<String>,
400
401 #[serde(skip_serializing_if = "Option::is_none")]
403 pub method: Option<String>,
404
405 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
407 pub headers: HashMap<String, String>,
408
409 #[serde(skip_serializing_if = "Option::is_none")]
411 pub body: Option<serde_json::Value>,
412
413 #[serde(skip_serializing_if = "Option::is_none")]
416 pub duration: Option<String>,
417
418 #[serde(default, skip_serializing_if = "Vec::is_empty")]
421 pub branches: Vec<String>,
422
423 #[serde(skip_serializing_if = "Option::is_none")]
426 pub strategy: Option<JoinStrategy>,
427
428 #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
430 pub extra: HashMap<String, serde_json::Value>,
431}
432
433#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
435#[serde(rename_all = "lowercase")]
436pub enum JoinStrategy {
437 All,
439 Any,
441 Majority,
443}
444
445impl Default for JoinStrategy {
446 fn default() -> Self {
447 Self::All
448 }
449}
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct NodeCondition {
454 pub from: String,
456
457 #[serde(skip_serializing_if = "Option::is_none")]
459 pub value: Option<serde_json::Value>,
460
461 #[serde(skip_serializing_if = "Option::is_none")]
463 pub reaction: Option<String>,
464}
465
466#[derive(Debug, Clone, Serialize, Deserialize)]
468pub struct FlowConnection {
469 pub from: String,
471
472 pub to: String,
474
475 #[serde(skip_serializing_if = "Option::is_none")]
477 pub when: Option<String>,
478}
479
480#[derive(Debug, Clone, Default, Serialize, Deserialize)]
482#[serde(rename_all = "camelCase")]
483pub struct FlowConfig {
484 #[serde(skip_serializing_if = "Option::is_none")]
486 pub default_timeout_seconds: Option<u32>,
487
488 #[serde(skip_serializing_if = "Option::is_none")]
490 pub retry: Option<FlowRetryConfig>,
491
492 #[serde(skip_serializing_if = "Option::is_none")]
494 pub error_handler: Option<String>,
495
496 #[serde(default)]
498 pub verbose: bool,
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize)]
503#[serde(rename_all = "camelCase")]
504pub struct FlowRetryConfig {
505 #[serde(default = "default_max_retries")]
507 pub max_attempts: u32,
508
509 #[serde(default = "default_retry_delay")]
511 pub initial_delay: String,
512
513 #[serde(default = "default_backoff_multiplier")]
515 pub backoff_multiplier: f64,
516}
517
518fn default_max_retries() -> u32 {
519 3
520}
521
522fn default_retry_delay() -> String {
523 "1s".to_string()
524}
525
526fn default_backoff_multiplier() -> f64 {
527 2.0
528}
529
530#[derive(Debug, Clone, Serialize, Deserialize)]
536pub struct AgentFlowState {
537 pub run_id: String,
539
540 pub flow_name: String,
542
543 pub current_nodes: Vec<String>,
545
546 pub status: FlowExecutionStatus,
548
549 pub node_results: HashMap<String, NodeResult>,
551
552 pub variables: HashMap<String, serde_json::Value>,
554
555 pub created_at: chrono::DateTime<chrono::Utc>,
557
558 pub updated_at: chrono::DateTime<chrono::Utc>,
560
561 #[serde(skip_serializing_if = "Option::is_none")]
563 pub error: Option<FlowError>,
564}
565
566#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
568#[serde(rename_all = "lowercase")]
569pub enum FlowExecutionStatus {
570 Pending,
572 Running,
574 Waiting,
576 Completed,
578 Failed,
580 Cancelled,
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize)]
586pub struct NodeResult {
587 pub node_id: String,
589
590 pub status: NodeExecutionStatus,
592
593 #[serde(skip_serializing_if = "Option::is_none")]
595 pub output: Option<serde_json::Value>,
596
597 pub started_at: chrono::DateTime<chrono::Utc>,
599
600 #[serde(skip_serializing_if = "Option::is_none")]
602 pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
603
604 #[serde(skip_serializing_if = "Option::is_none")]
606 pub duration_ms: Option<u64>,
607
608 #[serde(skip_serializing_if = "Option::is_none")]
610 pub error: Option<String>,
611}
612
613#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
615#[serde(rename_all = "lowercase")]
616pub enum NodeExecutionStatus {
617 Pending,
619 Running,
621 Waiting,
623 Completed,
625 Failed,
627 Skipped,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
633pub struct FlowError {
634 pub error_type: String,
636
637 pub message: String,
639
640 #[serde(skip_serializing_if = "Option::is_none")]
642 pub node_id: Option<String>,
643
644 #[serde(skip_serializing_if = "Option::is_none")]
646 pub details: Option<String>,
647}
648
649impl AgentFlow {
654 pub fn validate(&self) -> Result<(), String> {
656 if self.metadata.name.is_empty() {
658 return Err("Flow name is required".to_string());
659 }
660
661 if self.spec.nodes.is_empty() {
663 return Err("At least one node is required".to_string());
664 }
665
666 let node_ids: std::collections::HashSet<&str> =
668 self.spec.nodes.iter().map(|n| n.id.as_str()).collect();
669
670 if node_ids.len() != self.spec.nodes.len() {
672 return Err("Duplicate node IDs found".to_string());
673 }
674
675 for conn in &self.spec.connections {
678 if conn.from != "start" && !node_ids.contains(conn.from.as_str()) {
679 return Err(format!("Connection references unknown node: {}", conn.from));
680 }
681 if !node_ids.contains(conn.to.as_str()) {
682 return Err(format!("Connection references unknown node: {}", conn.to));
683 }
684 }
685
686 for node in &self.spec.nodes {
688 match node.node_type {
689 NodeType::Agent => {
690 if node.config.agent.is_none() && node.config.inline.is_none() {
692 return Err(format!(
693 "Agent node '{}' requires either 'agent' (reference) or 'inline' (embedded config)",
694 node.id
695 ));
696 }
697 }
698 NodeType::Script => {
699 if let Some(ref cfg) = node.config.script_config {
701 if cfg.command.is_none() && cfg.tool.is_none() {
702 return Err(format!(
703 "Script node '{}' requires either 'command' or 'tool' in script_config",
704 node.id
705 ));
706 }
707 } else {
708 return Err(format!(
709 "Script node '{}' requires 'script_config'",
710 node.id
711 ));
712 }
713 }
714 NodeType::Fleet => {
715 if node.config.fleet.is_none() {
717 return Err(format!(
718 "Fleet node '{}' requires 'fleet' config",
719 node.id
720 ));
721 }
722 }
723 NodeType::Conditional => {
724 if node.config.condition.is_none() {
725 return Err(format!(
726 "Conditional node '{}' requires 'condition' config",
727 node.id
728 ));
729 }
730 }
731 NodeType::Slack | NodeType::Discord => {
732 if node.config.channel.is_none() && node.config.message.is_none() {
733 }
735 }
736 _ => {}
737 }
738 }
739
740 Ok(())
741 }
742
743 pub fn entry_nodes(&self) -> Vec<&FlowNode> {
745 let entry_ids: std::collections::HashSet<&str> = self
746 .spec
747 .connections
748 .iter()
749 .filter(|c| c.from == "start")
750 .map(|c| c.to.as_str())
751 .collect();
752
753 self.spec
754 .nodes
755 .iter()
756 .filter(|n| entry_ids.contains(n.id.as_str()))
757 .collect()
758 }
759
760 pub fn successors(&self, node_id: &str) -> Vec<(&FlowNode, Option<&str>)> {
762 let node_map: HashMap<&str, &FlowNode> =
763 self.spec.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
764
765 self.spec
766 .connections
767 .iter()
768 .filter(|c| c.from == node_id)
769 .filter_map(|c| {
770 node_map
771 .get(c.to.as_str())
772 .map(|n| (*n, c.when.as_deref()))
773 })
774 .collect()
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781
782 #[test]
783 fn test_parse_agentflow() {
784 let yaml = r#"
785apiVersion: aof.dev/v1
786kind: AgentFlow
787metadata:
788 name: deploy-flow
789spec:
790 description: "Deployment workflow with approval"
791 nodes:
792 - id: validate
793 type: Agent
794 config:
795 agent: validator
796 input: ${input}
797 - id: deploy
798 type: Agent
799 config:
800 agent: deployer
801 connections:
802 - from: start
803 to: validate
804 - from: validate
805 to: deploy
806"#;
807
808 let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
809 assert_eq!(flow.metadata.name, "deploy-flow");
810 assert_eq!(flow.spec.description, Some("Deployment workflow with approval".to_string()));
811 assert_eq!(flow.spec.nodes.len(), 2);
812 assert_eq!(flow.spec.connections.len(), 2);
813
814 assert!(flow.validate().is_ok());
816 }
817
818 #[test]
819 fn test_entry_nodes() {
820 let yaml = r#"
821apiVersion: aof.dev/v1
822kind: AgentFlow
823metadata:
824 name: test-flow
825spec:
826 nodes:
827 - id: entry1
828 type: Transform
829 - id: entry2
830 type: Agent
831 config:
832 agent: test
833 - id: other
834 type: End
835 connections:
836 - from: start
837 to: entry1
838 - from: start
839 to: entry2
840 - from: entry1
841 to: other
842 - from: entry2
843 to: other
844"#;
845
846 let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
847 let entries = flow.entry_nodes();
848 assert_eq!(entries.len(), 2);
849 }
850
851 #[test]
852 fn test_validation_errors() {
853 let yaml = r#"
855apiVersion: aof.dev/v1
856kind: AgentFlow
857metadata:
858 name: bad-flow
859spec:
860 nodes: []
861 connections: []
862"#;
863
864 let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
865 assert!(flow.validate().is_err());
866
867 let yaml2 = r#"
869apiVersion: aof.dev/v1
870kind: AgentFlow
871metadata:
872 name: bad-flow
873spec:
874 nodes:
875 - id: agent
876 type: Agent
877 connections:
878 - from: start
879 to: agent
880"#;
881
882 let flow2: AgentFlow = serde_yaml::from_str(yaml2).unwrap();
883 assert!(flow2.validate().is_err());
884 }
885
886 #[test]
887 fn test_conditional_flow() {
888 let yaml = r#"
889apiVersion: aof.dev/v1
890kind: AgentFlow
891metadata:
892 name: conditional-flow
893spec:
894 nodes:
895 - id: check
896 type: Conditional
897 config:
898 condition: ${requires_approval} == true
899 - id: approve
900 type: Approval
901 config:
902 message: "Approval needed"
903 - id: execute
904 type: Agent
905 config:
906 agent: executor
907 connections:
908 - from: start
909 to: check
910 - from: check
911 to: approve
912 when: requires_approval == true
913 - from: check
914 to: execute
915 when: requires_approval == false
916 - from: approve
917 to: execute
918"#;
919
920 let flow: AgentFlow = serde_yaml::from_str(yaml).unwrap();
921 assert!(flow.validate().is_ok());
922
923 let successors = flow.successors("check");
924 assert_eq!(successors.len(), 2);
925 }
926}