Skip to main content

agent_orchestrator/resource/
workflow.rs

1use crate::cli_types::{OrchestratorResource, ResourceKind, ResourceSpec, WorkflowSpec};
2use crate::config::{LoopMode, OrchestratorConfig};
3use anyhow::{Result, anyhow};
4
5use super::{ApplyResult, RegisteredResource, Resource, ResourceMetadata};
6
7mod workflow_convert;
8
9use workflow_convert::parse_loop_mode;
10pub(crate) use workflow_convert::workflow_config_to_spec;
11pub(crate) use workflow_convert::workflow_spec_to_config;
12
13#[derive(Debug, Clone)]
14/// Builtin manifest adapter for `Workflow` resources.
15pub struct WorkflowResource {
16    /// Resource metadata from the manifest.
17    pub metadata: ResourceMetadata,
18    /// Manifest spec payload for the workflow.
19    pub spec: WorkflowSpec,
20}
21
22impl Resource for WorkflowResource {
23    fn kind(&self) -> ResourceKind {
24        ResourceKind::Workflow
25    }
26
27    fn name(&self) -> &str {
28        &self.metadata.name
29    }
30
31    fn validate(&self) -> Result<()> {
32        super::validate_resource_name(self.name())?;
33        if self.spec.steps.is_empty() {
34            return Err(anyhow!("workflow.spec.steps cannot be empty"));
35        }
36        if self.spec.steps.iter().any(|step| step.id.trim().is_empty()) {
37            return Err(anyhow!("workflow.spec.steps[].id cannot be empty"));
38        }
39        if self
40            .spec
41            .steps
42            .iter()
43            .any(|step| step.step_type.trim().is_empty())
44        {
45            return Err(anyhow!("workflow.spec.steps[].type cannot be empty"));
46        }
47        for step in &self.spec.steps {
48            crate::config::validate_step_type(&step.step_type).map_err(|e| anyhow!(e))?;
49        }
50        // Note: validate_step_type now accepts any non-empty step ID.
51        // Custom step IDs resolve to Agent { capability = step_id } at runtime.
52        let loop_mode = parse_loop_mode(&self.spec.loop_policy.mode)?;
53        if matches!(loop_mode, LoopMode::Fixed) {
54            match self.spec.loop_policy.max_cycles {
55                None | Some(0) => {
56                    return Err(anyhow!("workflow loop.mode=fixed requires max_cycles > 0"));
57                }
58                _ => {}
59            }
60        }
61        Ok(())
62    }
63
64    fn apply(&self, config: &mut OrchestratorConfig) -> Result<ApplyResult> {
65        let mut metadata = self.metadata.clone();
66        metadata.project = Some(
67            config
68                .effective_project_id(metadata.project.as_deref())
69                .to_string(),
70        );
71        Ok(super::apply_to_store(
72            config,
73            "Workflow",
74            self.name(),
75            &metadata,
76            serde_json::to_value(&self.spec)?,
77        ))
78    }
79
80    fn to_yaml(&self) -> Result<String> {
81        super::manifest_yaml(
82            ResourceKind::Workflow,
83            &self.metadata,
84            ResourceSpec::Workflow(self.spec.clone()),
85        )
86    }
87
88    fn get_from_project(
89        config: &OrchestratorConfig,
90        name: &str,
91        project_id: Option<&str>,
92    ) -> Option<Self> {
93        config
94            .project(project_id)?
95            .workflows
96            .get(name)
97            .map(|workflow| Self {
98                metadata: super::metadata_from_store(config, "Workflow", name, project_id),
99                spec: workflow_config_to_spec(workflow),
100            })
101    }
102
103    fn delete_from_project(
104        config: &mut OrchestratorConfig,
105        name: &str,
106        project_id: Option<&str>,
107    ) -> bool {
108        super::helpers::delete_from_store_project(config, "Workflow", name, project_id)
109    }
110}
111
112impl WorkflowResource {
113    /// Collect apply-time warnings (unknown fields, uncaptured prehook vars).
114    pub fn collect_warnings(&self) -> Vec<String> {
115        crate::config_load::collect_step_warnings(&self.spec.steps, &self.metadata.name)
116    }
117}
118
119/// Builds a typed `WorkflowResource` from a generic manifest wrapper.
120pub(super) fn build_workflow(resource: OrchestratorResource) -> Result<RegisteredResource> {
121    let OrchestratorResource {
122        kind,
123        metadata,
124        spec,
125        ..
126    } = resource;
127    if kind != ResourceKind::Workflow {
128        return Err(anyhow!("resource kind/spec mismatch for Workflow"));
129    }
130    match spec {
131        ResourceSpec::Workflow(spec) => Ok(RegisteredResource::Workflow(WorkflowResource {
132            metadata,
133            spec,
134        })),
135        _ => Err(anyhow!("resource kind/spec mismatch for Workflow")),
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use crate::cli_types::{
143        ResourceMetadata, ResourceSpec, SafetySpec, WorkflowLoopSpec, WorkflowStepSpec,
144    };
145    use crate::config_load::read_active_config;
146    use crate::resource::{API_VERSION, dispatch_resource};
147    use crate::test_utils::TestState;
148
149    use super::super::test_fixtures::{make_config, workflow_manifest};
150
151    #[test]
152    fn workflow_resource_roundtrip() {
153        let mut fixture = TestState::new();
154        let state = fixture.build();
155        let mut config = {
156            let active = read_active_config(&state).expect("state should be readable");
157            active.config.clone()
158        };
159
160        let resource = dispatch_resource(workflow_manifest("wf-roundtrip"))
161            .expect("workflow dispatch should succeed");
162        assert_eq!(
163            resource.apply(&mut config).expect("apply"),
164            ApplyResult::Created
165        );
166
167        let loaded = WorkflowResource::get_from(&config, "wf-roundtrip")
168            .expect("workflow should be present in config");
169        // After normalization, missing standard steps are added as disabled placeholders
170        assert!(!loaded.spec.steps.is_empty());
171        assert!(loaded.spec.steps.iter().any(|s| s.step_type == "qa"));
172        assert_eq!(loaded.spec.loop_policy.mode, "once");
173        assert_eq!(loaded.spec.loop_policy.max_cycles, Some(3));
174    }
175
176    #[test]
177    fn workflow_validate_rejects_empty_step_id() {
178        let wf = WorkflowResource {
179            metadata: super::super::metadata_with_name("wf-empty-id"),
180            spec: WorkflowSpec {
181                steps: vec![WorkflowStepSpec {
182                    id: "  ".to_string(),
183                    step_type: "qa".to_string(),
184                    required_capability: None,
185                    template: None,
186                    execution_profile: None,
187                    builtin: None,
188                    enabled: true,
189                    repeatable: false,
190                    is_guard: false,
191                    cost_preference: None,
192                    prehook: None,
193                    tty: false,
194                    command: None,
195                    chain_steps: vec![],
196                    scope: None,
197                    max_parallel: None,
198                    stagger_delay_ms: None,
199                    timeout_secs: None,
200                    stall_timeout_secs: None,
201                    behavior: Default::default(),
202                    item_select_config: None,
203                    store_inputs: vec![],
204                    store_outputs: vec![],
205                    step_vars: None,
206                    extra: Default::default(),
207                }],
208                loop_policy: WorkflowLoopSpec {
209                    mode: "once".to_string(),
210                    max_cycles: None,
211                    enabled: true,
212                    stop_when_no_unresolved: true,
213                    agent_template: None,
214                    convergence_expr: None,
215                },
216                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
217                dynamic_steps: vec![],
218                adaptive: None,
219                safety: SafetySpec::default(),
220                max_parallel: None,
221                stagger_delay_ms: None,
222                item_isolation: None,
223            },
224        };
225        let err = wf.validate().expect_err("operation should fail");
226        assert!(err.to_string().contains("id cannot be empty"));
227    }
228
229    #[test]
230    fn workflow_validate_rejects_empty_step_type() {
231        let wf = WorkflowResource {
232            metadata: super::super::metadata_with_name("wf-empty-type"),
233            spec: WorkflowSpec {
234                steps: vec![WorkflowStepSpec {
235                    id: "step1".to_string(),
236                    step_type: "  ".to_string(),
237                    required_capability: None,
238                    template: None,
239                    execution_profile: None,
240                    builtin: None,
241                    enabled: true,
242                    repeatable: false,
243                    is_guard: false,
244                    cost_preference: None,
245                    prehook: None,
246                    tty: false,
247                    command: None,
248                    chain_steps: vec![],
249                    scope: None,
250                    max_parallel: None,
251                    stagger_delay_ms: None,
252                    timeout_secs: None,
253                    stall_timeout_secs: None,
254                    behavior: Default::default(),
255                    item_select_config: None,
256                    store_inputs: vec![],
257                    store_outputs: vec![],
258                    step_vars: None,
259                    extra: Default::default(),
260                }],
261                loop_policy: WorkflowLoopSpec {
262                    mode: "once".to_string(),
263                    max_cycles: None,
264                    enabled: true,
265                    stop_when_no_unresolved: true,
266                    agent_template: None,
267                    convergence_expr: None,
268                },
269                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
270                dynamic_steps: vec![],
271                adaptive: None,
272                safety: SafetySpec::default(),
273                max_parallel: None,
274                stagger_delay_ms: None,
275                item_isolation: None,
276            },
277        };
278        let err = wf.validate().expect_err("operation should fail");
279        assert!(err.to_string().contains("type cannot be empty"));
280    }
281
282    #[test]
283    fn workflow_validate_rejects_fixed_without_max_cycles() {
284        let wf = WorkflowResource {
285            metadata: super::super::metadata_with_name("wf-fixed-no-max"),
286            spec: WorkflowSpec {
287                steps: vec![WorkflowStepSpec {
288                    id: "qa".to_string(),
289                    step_type: "qa".to_string(),
290                    required_capability: None,
291                    template: None,
292                    execution_profile: None,
293                    builtin: None,
294                    enabled: true,
295                    repeatable: false,
296                    is_guard: false,
297                    cost_preference: None,
298                    prehook: None,
299                    tty: false,
300                    command: None,
301                    chain_steps: vec![],
302                    scope: None,
303                    max_parallel: None,
304                    stagger_delay_ms: None,
305                    timeout_secs: None,
306                    stall_timeout_secs: None,
307                    behavior: Default::default(),
308                    item_select_config: None,
309                    store_inputs: vec![],
310                    store_outputs: vec![],
311                    step_vars: None,
312                    extra: Default::default(),
313                }],
314                loop_policy: WorkflowLoopSpec {
315                    mode: "fixed".to_string(),
316                    max_cycles: None,
317                    enabled: true,
318                    stop_when_no_unresolved: true,
319                    agent_template: None,
320                    convergence_expr: None,
321                },
322                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
323                dynamic_steps: vec![],
324                adaptive: None,
325                safety: SafetySpec::default(),
326                max_parallel: None,
327                stagger_delay_ms: None,
328                item_isolation: None,
329            },
330        };
331        let err = wf.validate().expect_err("operation should fail");
332        assert!(err.to_string().contains("max_cycles > 0"));
333    }
334
335    #[test]
336    fn workflow_validate_rejects_fixed_with_zero_max_cycles() {
337        let wf = WorkflowResource {
338            metadata: super::super::metadata_with_name("wf-fixed-zero"),
339            spec: WorkflowSpec {
340                steps: vec![WorkflowStepSpec {
341                    id: "qa".to_string(),
342                    step_type: "qa".to_string(),
343                    required_capability: None,
344                    template: None,
345                    execution_profile: None,
346                    builtin: None,
347                    enabled: true,
348                    repeatable: false,
349                    is_guard: false,
350                    cost_preference: None,
351                    prehook: None,
352                    tty: false,
353                    command: None,
354                    chain_steps: vec![],
355                    scope: None,
356                    max_parallel: None,
357                    stagger_delay_ms: None,
358                    timeout_secs: None,
359                    stall_timeout_secs: None,
360                    behavior: Default::default(),
361                    item_select_config: None,
362                    store_inputs: vec![],
363                    store_outputs: vec![],
364                    step_vars: None,
365                    extra: Default::default(),
366                }],
367                loop_policy: WorkflowLoopSpec {
368                    mode: "fixed".to_string(),
369                    max_cycles: Some(0),
370                    enabled: true,
371                    stop_when_no_unresolved: true,
372                    agent_template: None,
373                    convergence_expr: None,
374                },
375                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
376                dynamic_steps: vec![],
377                adaptive: None,
378                safety: SafetySpec::default(),
379                max_parallel: None,
380                stagger_delay_ms: None,
381                item_isolation: None,
382            },
383        };
384        let err = wf.validate().expect_err("operation should fail");
385        assert!(err.to_string().contains("max_cycles > 0"));
386    }
387
388    #[test]
389    fn workflow_validate_accepts_fixed_with_valid_max_cycles() {
390        let wf = WorkflowResource {
391            metadata: super::super::metadata_with_name("wf-fixed-ok"),
392            spec: WorkflowSpec {
393                steps: vec![WorkflowStepSpec {
394                    id: "qa".to_string(),
395                    step_type: "qa".to_string(),
396                    required_capability: None,
397                    template: None,
398                    execution_profile: None,
399                    builtin: None,
400                    enabled: true,
401                    repeatable: false,
402                    is_guard: false,
403                    cost_preference: None,
404                    prehook: None,
405                    tty: false,
406                    command: None,
407                    chain_steps: vec![],
408                    scope: None,
409                    max_parallel: None,
410                    stagger_delay_ms: None,
411                    timeout_secs: None,
412                    stall_timeout_secs: None,
413                    behavior: Default::default(),
414                    item_select_config: None,
415                    store_inputs: vec![],
416                    store_outputs: vec![],
417                    step_vars: None,
418                    extra: Default::default(),
419                }],
420                loop_policy: WorkflowLoopSpec {
421                    mode: "fixed".to_string(),
422                    max_cycles: Some(3),
423                    enabled: true,
424                    stop_when_no_unresolved: true,
425                    agent_template: None,
426                    convergence_expr: None,
427                },
428                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
429                dynamic_steps: vec![],
430                adaptive: None,
431                safety: SafetySpec::default(),
432                max_parallel: None,
433                stagger_delay_ms: None,
434                item_isolation: None,
435            },
436        };
437        assert!(wf.validate().is_ok());
438    }
439
440    #[test]
441    fn workflow_validation_rejects_empty_steps() {
442        let workflow = WorkflowResource {
443            metadata: ResourceMetadata {
444                name: "test-workflow".to_string(),
445                project: None,
446                labels: None,
447                annotations: None,
448            },
449            spec: WorkflowSpec {
450                steps: vec![],
451                loop_policy: WorkflowLoopSpec {
452                    mode: "once".to_string(),
453                    max_cycles: None,
454                    enabled: true,
455                    stop_when_no_unresolved: true,
456                    agent_template: None,
457                    convergence_expr: None,
458                },
459                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
460                dynamic_steps: vec![],
461                adaptive: None,
462                safety: SafetySpec::default(),
463                max_parallel: None,
464                stagger_delay_ms: None,
465                item_isolation: None,
466            },
467        };
468        let result = workflow.validate();
469        assert!(result.is_err());
470        assert!(
471            result
472                .expect_err("operation should fail")
473                .to_string()
474                .contains("cannot be empty")
475        );
476    }
477
478    #[test]
479    fn workflow_get_from_returns_none_for_missing() {
480        let config = make_config();
481        assert!(WorkflowResource::get_from(&config, "nonexistent-wf").is_none());
482    }
483
484    #[test]
485    fn workflow_delete_cleans_up_metadata() {
486        let mut config = make_config();
487        let wf =
488            dispatch_resource(workflow_manifest("meta-wf")).expect("dispatch workflow resource");
489        wf.apply(&mut config).expect("apply");
490        assert!(
491            config
492                .resource_store
493                .get_namespaced("Workflow", crate::config::DEFAULT_PROJECT_ID, "meta-wf")
494                .is_some()
495        );
496
497        WorkflowResource::delete_from(&mut config, "meta-wf");
498        assert!(
499            config
500                .resource_store
501                .get_namespaced("Workflow", crate::config::DEFAULT_PROJECT_ID, "meta-wf")
502                .is_none()
503        );
504    }
505
506    #[test]
507    fn workflow_apply_stores_resource_metadata() {
508        let mut config = make_config();
509        let resource = OrchestratorResource {
510            api_version: API_VERSION.to_string(),
511            kind: ResourceKind::Workflow,
512            metadata: ResourceMetadata {
513                name: "store-meta-wf".to_string(),
514                project: None,
515                labels: Some([("version".to_string(), "v2".to_string())].into()),
516                annotations: None,
517            },
518            spec: ResourceSpec::Workflow(WorkflowSpec {
519                steps: vec![WorkflowStepSpec {
520                    id: "qa".to_string(),
521                    step_type: "qa".to_string(),
522                    required_capability: None,
523                    template: None,
524                    execution_profile: None,
525                    builtin: None,
526                    enabled: true,
527                    repeatable: false,
528                    is_guard: false,
529                    cost_preference: None,
530                    prehook: None,
531                    tty: false,
532                    command: None,
533                    chain_steps: vec![],
534                    scope: None,
535                    max_parallel: None,
536                    stagger_delay_ms: None,
537                    timeout_secs: None,
538                    stall_timeout_secs: None,
539                    behavior: Default::default(),
540                    item_select_config: None,
541                    store_inputs: vec![],
542                    store_outputs: vec![],
543                    step_vars: None,
544                    extra: Default::default(),
545                }],
546                loop_policy: WorkflowLoopSpec {
547                    mode: "once".to_string(),
548                    max_cycles: None,
549                    enabled: true,
550                    stop_when_no_unresolved: true,
551                    agent_template: None,
552                    convergence_expr: None,
553                },
554                finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
555                dynamic_steps: vec![],
556                adaptive: None,
557                safety: SafetySpec::default(),
558                max_parallel: None,
559                stagger_delay_ms: None,
560                item_isolation: None,
561            }),
562        };
563        let rr = dispatch_resource(resource).expect("dispatch workflow resource");
564        rr.apply(&mut config).expect("apply");
565
566        let cr = config
567            .resource_store
568            .get_namespaced(
569                "Workflow",
570                crate::config::DEFAULT_PROJECT_ID,
571                "store-meta-wf",
572            )
573            .expect("stored workflow CR should exist");
574        assert_eq!(
575            cr.metadata
576                .labels
577                .as_ref()
578                .expect("labels should exist")
579                .get("version")
580                .expect("version label should exist"),
581            "v2"
582        );
583    }
584
585    #[test]
586    fn build_workflow_rejects_wrong_kind() {
587        use crate::cli_types::ProjectSpec;
588        let resource = OrchestratorResource {
589            api_version: API_VERSION.to_string(),
590            kind: ResourceKind::Workflow,
591            metadata: ResourceMetadata {
592                name: "bad".to_string(),
593                project: None,
594                labels: None,
595                annotations: None,
596            },
597            spec: ResourceSpec::Project(ProjectSpec { description: None }),
598        };
599        let err = dispatch_resource(resource).expect_err("operation should fail");
600        assert!(err.to_string().contains("mismatch"));
601    }
602}