1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11#[serde(rename_all = "camelCase")]
12pub struct Workflow {
13 #[serde(default = "default_api_version")]
15 pub api_version: String,
16
17 #[serde(default = "default_workflow_kind")]
19 pub kind: String,
20
21 pub metadata: WorkflowMetadata,
23
24 pub spec: WorkflowSpec,
26}
27
28fn default_api_version() -> String {
29 "aof.dev/v1".to_string()
30}
31
32fn default_workflow_kind() -> String {
33 "Workflow".to_string()
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct WorkflowMetadata {
39 pub name: String,
41
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub namespace: Option<String>,
45
46 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48 pub labels: HashMap<String, String>,
49
50 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
52 pub annotations: HashMap<String, String>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "camelCase")]
58pub struct WorkflowSpec {
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub state: Option<StateSchema>,
62
63 pub entrypoint: String,
65
66 pub steps: Vec<WorkflowStep>,
68
69 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
71 pub reducers: HashMap<String, StateReducer>,
72
73 #[serde(skip_serializing_if = "Option::is_none")]
75 pub error_handler: Option<String>,
76
77 #[serde(skip_serializing_if = "Option::is_none")]
79 pub retry: Option<RetryConfig>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub checkpointing: Option<CheckpointConfig>,
84
85 #[serde(skip_serializing_if = "Option::is_none")]
87 pub recovery: Option<RecoveryConfig>,
88
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub fleet: Option<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct StateSchema {
97 #[serde(rename = "type")]
99 pub schema_type: String,
100
101 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
103 pub properties: HashMap<String, PropertySchema>,
104
105 #[serde(default, skip_serializing_if = "Vec::is_empty")]
107 pub required: Vec<String>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct PropertySchema {
113 #[serde(rename = "type")]
115 pub prop_type: String,
116
117 #[serde(rename = "enum", skip_serializing_if = "Option::is_none")]
119 pub enum_values: Option<Vec<String>>,
120
121 #[serde(skip_serializing_if = "Option::is_none")]
123 pub items: Option<Box<PropertySchema>>,
124
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub default: Option<serde_json::Value>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132#[serde(rename_all = "lowercase")]
133pub struct StateReducer {
134 #[serde(rename = "type")]
136 pub reducer_type: ReducerType,
137}
138
139#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
141#[serde(rename_all = "lowercase")]
142pub enum ReducerType {
143 Append,
145 Merge,
147 Sum,
149 Replace,
151}
152
153impl Default for ReducerType {
154 fn default() -> Self {
155 Self::Replace
156 }
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")]
162pub struct WorkflowStep {
163 pub name: String,
165
166 #[serde(rename = "type")]
168 pub step_type: StepType,
169
170 #[serde(skip_serializing_if = "Option::is_none")]
172 pub agent: Option<String>,
173
174 #[serde(skip_serializing_if = "Option::is_none")]
176 pub config: Option<StepConfig>,
177
178 #[serde(default, skip_serializing_if = "Vec::is_empty")]
180 pub validation: Vec<ValidationRule>,
181
182 #[serde(skip_serializing_if = "Option::is_none")]
184 pub next: Option<NextStep>,
185
186 #[serde(default)]
188 pub parallel: bool,
189
190 #[serde(skip_serializing_if = "Option::is_none")]
192 pub branches: Option<Vec<ParallelBranch>>,
193
194 #[serde(skip_serializing_if = "Option::is_none")]
196 pub join: Option<JoinConfig>,
197
198 #[serde(skip_serializing_if = "Option::is_none")]
200 pub on_error: Option<Vec<ConditionalNext>>,
201
202 #[serde(skip_serializing_if = "Option::is_none")]
204 pub interrupt: Option<InterruptConfig>,
205
206 #[serde(skip_serializing_if = "Option::is_none")]
208 pub status: Option<TerminalStatus>,
209
210 #[serde(skip_serializing_if = "Option::is_none")]
212 pub timeout: Option<String>,
213}
214
215#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
217#[serde(rename_all = "lowercase")]
218pub enum StepType {
219 Agent,
221 Approval,
223 Validation,
225 Parallel,
227 Join,
229 Terminal,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235#[serde(rename_all = "camelCase")]
236pub struct StepConfig {
237 #[serde(default, skip_serializing_if = "Vec::is_empty")]
239 pub approvers: Vec<Approver>,
240
241 #[serde(skip_serializing_if = "Option::is_none")]
243 pub timeout: Option<String>,
244
245 #[serde(skip_serializing_if = "Option::is_none")]
247 pub required_approvals: Option<u32>,
248
249 #[serde(skip_serializing_if = "Option::is_none")]
251 pub auto_approve: Option<AutoApproveConfig>,
252
253 #[serde(default, skip_serializing_if = "Vec::is_empty")]
255 pub validators: Vec<Validator>,
256
257 #[serde(skip_serializing_if = "Option::is_none")]
259 pub max_retries: Option<u32>,
260
261 #[serde(skip_serializing_if = "Option::is_none")]
263 pub on_failure: Option<String>,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct Approver {
269 #[serde(skip_serializing_if = "Option::is_none")]
271 pub role: Option<String>,
272
273 #[serde(skip_serializing_if = "Option::is_none")]
275 pub user: Option<String>,
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct AutoApproveConfig {
281 pub condition: String,
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
287#[serde(rename_all = "camelCase")]
288pub struct Validator {
289 #[serde(rename = "type")]
291 pub validator_type: ValidatorType,
292
293 #[serde(skip_serializing_if = "Option::is_none")]
295 pub name: Option<String>,
296
297 #[serde(skip_serializing_if = "Option::is_none")]
299 pub args: Option<serde_json::Value>,
300
301 #[serde(skip_serializing_if = "Option::is_none")]
303 pub model: Option<String>,
304
305 #[serde(skip_serializing_if = "Option::is_none")]
307 pub prompt: Option<String>,
308
309 #[serde(skip_serializing_if = "Option::is_none")]
311 pub command: Option<String>,
312
313 #[serde(skip_serializing_if = "Option::is_none")]
315 pub timeout: Option<String>,
316}
317
318#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
320#[serde(rename_all = "lowercase")]
321pub enum ValidatorType {
322 Function,
324 Llm,
326 Script,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
332pub struct ValidationRule {
333 #[serde(rename = "type")]
335 pub rule_type: ValidatorType,
336
337 #[serde(skip_serializing_if = "Option::is_none")]
339 pub script: Option<String>,
340
341 #[serde(skip_serializing_if = "Option::is_none")]
343 pub prompt: Option<String>,
344}
345
346#[derive(Debug, Clone, Serialize, Deserialize)]
348#[serde(untagged)]
349pub enum NextStep {
350 Simple(String),
352 Conditional(Vec<ConditionalNext>),
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize)]
358pub struct ConditionalNext {
359 #[serde(skip_serializing_if = "Option::is_none")]
361 pub condition: Option<String>,
362
363 pub target: String,
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct ParallelBranch {
370 pub name: String,
372
373 pub steps: Vec<BranchStep>,
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize)]
379pub struct BranchStep {
380 #[serde(skip_serializing_if = "Option::is_none")]
382 pub agent: Option<String>,
383
384 #[serde(skip_serializing_if = "Option::is_none")]
386 pub name: Option<String>,
387}
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct JoinConfig {
392 pub strategy: JoinStrategy,
394
395 #[serde(skip_serializing_if = "Option::is_none")]
397 pub timeout: Option<String>,
398}
399
400#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
402#[serde(rename_all = "lowercase")]
403pub enum JoinStrategy {
404 All,
406 Any,
408 Majority,
410}
411
412impl Default for JoinStrategy {
413 fn default() -> Self {
414 Self::All
415 }
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct InterruptConfig {
421 #[serde(rename = "type")]
423 pub interrupt_type: InterruptType,
424
425 #[serde(skip_serializing_if = "Option::is_none")]
427 pub prompt: Option<String>,
428
429 #[serde(skip_serializing_if = "Option::is_none")]
431 pub schema: Option<StateSchema>,
432}
433
434#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
436#[serde(rename_all = "lowercase")]
437pub enum InterruptType {
438 Input,
440 Confirm,
442}
443
444#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
446#[serde(rename_all = "lowercase")]
447pub enum TerminalStatus {
448 Completed,
450 Failed,
452 Cancelled,
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
458#[serde(rename_all = "camelCase")]
459pub struct RetryConfig {
460 #[serde(default = "default_max_attempts")]
462 pub max_attempts: u32,
463
464 #[serde(default)]
466 pub backoff: BackoffStrategy,
467
468 #[serde(default = "default_initial_delay")]
470 pub initial_delay: String,
471
472 #[serde(default = "default_max_delay")]
474 pub max_delay: String,
475}
476
477fn default_max_attempts() -> u32 {
478 3
479}
480
481fn default_initial_delay() -> String {
482 "1s".to_string()
483}
484
485fn default_max_delay() -> String {
486 "30s".to_string()
487}
488
489#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
491#[serde(rename_all = "lowercase")]
492pub enum BackoffStrategy {
493 Fixed,
495 Linear,
497 #[default]
499 Exponential,
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize)]
504pub struct CheckpointConfig {
505 #[serde(default = "default_true")]
507 pub enabled: bool,
508
509 #[serde(default)]
511 pub backend: CheckpointBackend,
512
513 #[serde(skip_serializing_if = "Option::is_none")]
515 pub path: Option<String>,
516
517 #[serde(skip_serializing_if = "Option::is_none")]
519 pub url: Option<String>,
520
521 #[serde(default)]
523 pub frequency: CheckpointFrequency,
524
525 #[serde(default = "default_history")]
527 pub history: u32,
528}
529
530fn default_true() -> bool {
531 true
532}
533
534fn default_history() -> u32 {
535 10
536}
537
538#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
540#[serde(rename_all = "lowercase")]
541pub enum CheckpointBackend {
542 #[default]
544 File,
545 Redis,
547 Postgres,
549}
550
551#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
553#[serde(rename_all = "lowercase")]
554pub enum CheckpointFrequency {
555 #[default]
557 Step,
558 Change,
560 Interval,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
566#[serde(rename_all = "camelCase")]
567pub struct RecoveryConfig {
568 #[serde(default = "default_true")]
570 pub auto_resume: bool,
571
572 #[serde(default = "default_true")]
574 pub skip_completed: bool,
575}
576
577#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct WorkflowState {
584 pub run_id: String,
586
587 pub workflow_name: String,
589
590 pub current_step: String,
592
593 pub status: WorkflowStatus,
595
596 pub data: serde_json::Value,
598
599 pub completed_steps: Vec<String>,
601
602 pub step_results: HashMap<String, StepResult>,
604
605 pub created_at: chrono::DateTime<chrono::Utc>,
607
608 pub updated_at: chrono::DateTime<chrono::Utc>,
610
611 #[serde(skip_serializing_if = "Option::is_none")]
613 pub error: Option<WorkflowError>,
614}
615
616#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
618#[serde(rename_all = "lowercase")]
619pub enum WorkflowStatus {
620 Pending,
622 Running,
624 WaitingApproval,
626 WaitingInput,
628 Completed,
630 Failed,
632 Cancelled,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct StepResult {
639 pub step_name: String,
641
642 pub status: StepStatus,
644
645 #[serde(skip_serializing_if = "Option::is_none")]
647 pub output: Option<serde_json::Value>,
648
649 pub started_at: chrono::DateTime<chrono::Utc>,
651
652 #[serde(skip_serializing_if = "Option::is_none")]
654 pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
655
656 #[serde(skip_serializing_if = "Option::is_none")]
658 pub duration_ms: Option<u64>,
659
660 #[serde(skip_serializing_if = "Option::is_none")]
662 pub error: Option<String>,
663}
664
665#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
667#[serde(rename_all = "lowercase")]
668pub enum StepStatus {
669 Pending,
671 Running,
673 Completed,
675 Failed,
677 Skipped,
679}
680
681#[derive(Debug, Clone, Serialize, Deserialize)]
683pub struct WorkflowError {
684 pub error_type: String,
686
687 pub message: String,
689
690 #[serde(skip_serializing_if = "Option::is_none")]
692 pub step: Option<String>,
693
694 #[serde(skip_serializing_if = "Option::is_none")]
696 pub details: Option<String>,
697}
698
699#[derive(Debug, Clone, Serialize, Deserialize)]
705#[serde(rename_all = "camelCase")]
706pub struct FlatWorkflowConfig {
707 pub name: String,
709
710 pub entrypoint: String,
712
713 pub steps: Vec<WorkflowStep>,
715
716 #[serde(skip_serializing_if = "Option::is_none")]
718 pub state: Option<StateSchema>,
719
720 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
722 pub reducers: HashMap<String, StateReducer>,
723
724 #[serde(skip_serializing_if = "Option::is_none")]
726 pub error_handler: Option<String>,
727
728 #[serde(skip_serializing_if = "Option::is_none")]
730 pub retry: Option<RetryConfig>,
731
732 #[serde(skip_serializing_if = "Option::is_none")]
734 pub checkpointing: Option<CheckpointConfig>,
735}
736
737#[derive(Debug, Clone, Serialize, Deserialize)]
739#[serde(untagged)]
740pub enum WorkflowConfigInput {
741 Kubernetes(Workflow),
743 Flat(FlatWorkflowConfig),
745}
746
747impl From<WorkflowConfigInput> for Workflow {
748 fn from(input: WorkflowConfigInput) -> Self {
749 match input {
750 WorkflowConfigInput::Kubernetes(w) => w,
751 WorkflowConfigInput::Flat(flat) => Workflow {
752 api_version: default_api_version(),
753 kind: default_workflow_kind(),
754 metadata: WorkflowMetadata {
755 name: flat.name,
756 namespace: None,
757 labels: HashMap::new(),
758 annotations: HashMap::new(),
759 },
760 spec: WorkflowSpec {
761 state: flat.state,
762 entrypoint: flat.entrypoint,
763 steps: flat.steps,
764 reducers: flat.reducers,
765 error_handler: flat.error_handler,
766 retry: flat.retry,
767 checkpointing: flat.checkpointing,
768 recovery: None,
769 fleet: None,
770 },
771 },
772 }
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use super::*;
779
780 #[test]
781 fn test_parse_kubernetes_workflow() {
782 let yaml = r#"
783apiVersion: aof.dev/v1
784kind: Workflow
785metadata:
786 name: test-workflow
787 labels:
788 category: test
789spec:
790 entrypoint: start
791 steps:
792 - name: start
793 type: agent
794 agent: test-agent
795 next: end
796 - name: end
797 type: terminal
798 status: completed
799"#;
800
801 let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
802 assert_eq!(workflow.metadata.name, "test-workflow");
803 assert_eq!(workflow.spec.entrypoint, "start");
804 assert_eq!(workflow.spec.steps.len(), 2);
805 }
806
807 #[test]
808 fn test_parse_flat_workflow() {
809 let yaml = r#"
810name: simple-workflow
811entrypoint: step1
812steps:
813 - name: step1
814 type: agent
815 agent: my-agent
816 next: step2
817 - name: step2
818 type: terminal
819 status: completed
820"#;
821
822 let input: WorkflowConfigInput = serde_yaml::from_str(yaml).unwrap();
823 let workflow: Workflow = input.into();
824 assert_eq!(workflow.metadata.name, "simple-workflow");
825 assert_eq!(workflow.spec.steps.len(), 2);
826 }
827
828 #[test]
829 fn test_conditional_routing() {
830 let yaml = r#"
831apiVersion: aof.dev/v1
832kind: Workflow
833metadata:
834 name: conditional-workflow
835spec:
836 entrypoint: check
837 steps:
838 - name: check
839 type: agent
840 agent: checker
841 next:
842 - condition: "state.score > 0.8"
843 target: high
844 - condition: "state.score > 0.5"
845 target: medium
846 - target: low
847 - name: high
848 type: terminal
849 status: completed
850 - name: medium
851 type: terminal
852 status: completed
853 - name: low
854 type: terminal
855 status: completed
856"#;
857
858 let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
859 let check_step = &workflow.spec.steps[0];
860 match &check_step.next {
861 Some(NextStep::Conditional(conds)) => {
862 assert_eq!(conds.len(), 3);
863 assert_eq!(conds[0].condition.as_ref().unwrap(), "state.score > 0.8");
864 assert_eq!(conds[0].target, "high");
865 assert!(conds[2].condition.is_none()); }
867 _ => panic!("Expected conditional next"),
868 }
869 }
870
871 #[test]
872 fn test_parallel_execution() {
873 let yaml = r#"
874apiVersion: aof.dev/v1
875kind: Workflow
876metadata:
877 name: parallel-workflow
878spec:
879 entrypoint: analyze
880 steps:
881 - name: analyze
882 type: parallel
883 branches:
884 - name: logs
885 steps:
886 - agent: log-analyzer
887 - name: metrics
888 steps:
889 - agent: metric-analyzer
890 join:
891 strategy: all
892 timeout: 10m
893 next: aggregate
894 - name: aggregate
895 type: terminal
896 status: completed
897"#;
898
899 let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
900 let parallel_step = &workflow.spec.steps[0];
901 assert_eq!(parallel_step.step_type, StepType::Parallel);
902 let branches = parallel_step.branches.as_ref().unwrap();
903 assert_eq!(branches.len(), 2);
904 let join = parallel_step.join.as_ref().unwrap();
905 assert_eq!(join.strategy, JoinStrategy::All);
906 }
907
908 #[test]
909 fn test_approval_step() {
910 let yaml = r#"
911apiVersion: aof.dev/v1
912kind: Workflow
913metadata:
914 name: approval-workflow
915spec:
916 entrypoint: deploy-approval
917 steps:
918 - name: deploy-approval
919 type: approval
920 config:
921 approvers:
922 - role: sre-team
923 - user: admin@example.com
924 timeout: 30m
925 requiredApprovals: 2
926 autoApprove:
927 condition: "state.environment == 'dev'"
928 next:
929 - condition: approved
930 target: deploy
931 - condition: rejected
932 target: notify
933 - condition: timeout
934 target: escalate
935 - name: deploy
936 type: terminal
937 status: completed
938 - name: notify
939 type: terminal
940 status: completed
941 - name: escalate
942 type: terminal
943 status: completed
944"#;
945
946 let workflow: Workflow = serde_yaml::from_str(yaml).unwrap();
947 let approval_step = &workflow.spec.steps[0];
948 assert_eq!(approval_step.step_type, StepType::Approval);
949 let config = approval_step.config.as_ref().unwrap();
950 assert_eq!(config.approvers.len(), 2);
951 assert_eq!(config.required_approvals, Some(2));
952 }
953
954 #[test]
955 fn test_workflow_state_serialization() {
956 let state = WorkflowState {
957 run_id: "run-123".to_string(),
958 workflow_name: "test".to_string(),
959 current_step: "step1".to_string(),
960 status: WorkflowStatus::Running,
961 data: serde_json::json!({"key": "value"}),
962 completed_steps: vec!["step0".to_string()],
963 step_results: HashMap::new(),
964 created_at: chrono::Utc::now(),
965 updated_at: chrono::Utc::now(),
966 error: None,
967 };
968
969 let json = serde_json::to_string(&state).unwrap();
970 let parsed: WorkflowState = serde_json::from_str(&json).unwrap();
971 assert_eq!(parsed.run_id, "run-123");
972 assert_eq!(parsed.status, WorkflowStatus::Running);
973 }
974}