1use crate::cli_types::{OrchestratorResource, ResourceKind, ResourceSpec, WorkflowSpec};
2use crate::config::{LoopMode, OrchestratorConfig};
3use anyhow::{Result, anyhow};
4
5use super::{ApplyResult, RegisteredResource, Resource, ResourceMetadata};
6
7mod workflow_convert;
8
9use workflow_convert::parse_loop_mode;
10pub(crate) use workflow_convert::workflow_config_to_spec;
11pub(crate) use workflow_convert::workflow_spec_to_config;
12
13#[derive(Debug, Clone)]
14pub struct WorkflowResource {
16 pub metadata: ResourceMetadata,
18 pub spec: WorkflowSpec,
20}
21
22impl Resource for WorkflowResource {
23 fn kind(&self) -> ResourceKind {
24 ResourceKind::Workflow
25 }
26
27 fn name(&self) -> &str {
28 &self.metadata.name
29 }
30
31 fn validate(&self) -> Result<()> {
32 super::validate_resource_name(self.name())?;
33 if self.spec.steps.is_empty() {
34 return Err(anyhow!("workflow.spec.steps cannot be empty"));
35 }
36 if self.spec.steps.iter().any(|step| step.id.trim().is_empty()) {
37 return Err(anyhow!("workflow.spec.steps[].id cannot be empty"));
38 }
39 if self
40 .spec
41 .steps
42 .iter()
43 .any(|step| step.step_type.trim().is_empty())
44 {
45 return Err(anyhow!("workflow.spec.steps[].type cannot be empty"));
46 }
47 for step in &self.spec.steps {
48 crate::config::validate_step_type(&step.step_type).map_err(|e| anyhow!(e))?;
49 }
50 let loop_mode = parse_loop_mode(&self.spec.loop_policy.mode)?;
53 if matches!(loop_mode, LoopMode::Fixed) {
54 match self.spec.loop_policy.max_cycles {
55 None | Some(0) => {
56 return Err(anyhow!("workflow loop.mode=fixed requires max_cycles > 0"));
57 }
58 _ => {}
59 }
60 }
61 Ok(())
62 }
63
64 fn apply(&self, config: &mut OrchestratorConfig) -> Result<ApplyResult> {
65 let mut metadata = self.metadata.clone();
66 metadata.project = Some(
67 config
68 .effective_project_id(metadata.project.as_deref())
69 .to_string(),
70 );
71 Ok(super::apply_to_store(
72 config,
73 "Workflow",
74 self.name(),
75 &metadata,
76 serde_json::to_value(&self.spec)?,
77 ))
78 }
79
80 fn to_yaml(&self) -> Result<String> {
81 super::manifest_yaml(
82 ResourceKind::Workflow,
83 &self.metadata,
84 ResourceSpec::Workflow(self.spec.clone()),
85 )
86 }
87
88 fn get_from_project(
89 config: &OrchestratorConfig,
90 name: &str,
91 project_id: Option<&str>,
92 ) -> Option<Self> {
93 config
94 .project(project_id)?
95 .workflows
96 .get(name)
97 .map(|workflow| Self {
98 metadata: super::metadata_from_store(config, "Workflow", name, project_id),
99 spec: workflow_config_to_spec(workflow),
100 })
101 }
102
103 fn delete_from_project(
104 config: &mut OrchestratorConfig,
105 name: &str,
106 project_id: Option<&str>,
107 ) -> bool {
108 super::helpers::delete_from_store_project(config, "Workflow", name, project_id)
109 }
110}
111
112impl WorkflowResource {
113 pub fn collect_warnings(&self) -> Vec<String> {
115 crate::config_load::collect_step_warnings(&self.spec.steps, &self.metadata.name)
116 }
117}
118
119pub(super) fn build_workflow(resource: OrchestratorResource) -> Result<RegisteredResource> {
121 let OrchestratorResource {
122 kind,
123 metadata,
124 spec,
125 ..
126 } = resource;
127 if kind != ResourceKind::Workflow {
128 return Err(anyhow!("resource kind/spec mismatch for Workflow"));
129 }
130 match spec {
131 ResourceSpec::Workflow(spec) => Ok(RegisteredResource::Workflow(WorkflowResource {
132 metadata,
133 spec,
134 })),
135 _ => Err(anyhow!("resource kind/spec mismatch for Workflow")),
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::cli_types::{
143 ResourceMetadata, ResourceSpec, SafetySpec, WorkflowLoopSpec, WorkflowStepSpec,
144 };
145 use crate::config_load::read_active_config;
146 use crate::resource::{API_VERSION, dispatch_resource};
147 use crate::test_utils::TestState;
148
149 use super::super::test_fixtures::{make_config, workflow_manifest};
150
151 #[test]
152 fn workflow_resource_roundtrip() {
153 let mut fixture = TestState::new();
154 let state = fixture.build();
155 let mut config = {
156 let active = read_active_config(&state).expect("state should be readable");
157 active.config.clone()
158 };
159
160 let resource = dispatch_resource(workflow_manifest("wf-roundtrip"))
161 .expect("workflow dispatch should succeed");
162 assert_eq!(
163 resource.apply(&mut config).expect("apply"),
164 ApplyResult::Created
165 );
166
167 let loaded = WorkflowResource::get_from(&config, "wf-roundtrip")
168 .expect("workflow should be present in config");
169 assert!(!loaded.spec.steps.is_empty());
171 assert!(loaded.spec.steps.iter().any(|s| s.step_type == "qa"));
172 assert_eq!(loaded.spec.loop_policy.mode, "once");
173 assert_eq!(loaded.spec.loop_policy.max_cycles, Some(3));
174 }
175
176 #[test]
177 fn workflow_validate_rejects_empty_step_id() {
178 let wf = WorkflowResource {
179 metadata: super::super::metadata_with_name("wf-empty-id"),
180 spec: WorkflowSpec {
181 steps: vec![WorkflowStepSpec {
182 id: " ".to_string(),
183 step_type: "qa".to_string(),
184 required_capability: None,
185 template: None,
186 execution_profile: None,
187 builtin: None,
188 enabled: true,
189 repeatable: false,
190 is_guard: false,
191 cost_preference: None,
192 prehook: None,
193 tty: false,
194 command: None,
195 chain_steps: vec![],
196 scope: None,
197 max_parallel: None,
198 stagger_delay_ms: None,
199 timeout_secs: None,
200 stall_timeout_secs: None,
201 behavior: Default::default(),
202 item_select_config: None,
203 store_inputs: vec![],
204 store_outputs: vec![],
205 step_vars: None,
206 extra: Default::default(),
207 }],
208 loop_policy: WorkflowLoopSpec {
209 mode: "once".to_string(),
210 max_cycles: None,
211 enabled: true,
212 stop_when_no_unresolved: true,
213 agent_template: None,
214 convergence_expr: None,
215 },
216 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
217 dynamic_steps: vec![],
218 adaptive: None,
219 safety: SafetySpec::default(),
220 max_parallel: None,
221 stagger_delay_ms: None,
222 item_isolation: None,
223 },
224 };
225 let err = wf.validate().expect_err("operation should fail");
226 assert!(err.to_string().contains("id cannot be empty"));
227 }
228
229 #[test]
230 fn workflow_validate_rejects_empty_step_type() {
231 let wf = WorkflowResource {
232 metadata: super::super::metadata_with_name("wf-empty-type"),
233 spec: WorkflowSpec {
234 steps: vec![WorkflowStepSpec {
235 id: "step1".to_string(),
236 step_type: " ".to_string(),
237 required_capability: None,
238 template: None,
239 execution_profile: None,
240 builtin: None,
241 enabled: true,
242 repeatable: false,
243 is_guard: false,
244 cost_preference: None,
245 prehook: None,
246 tty: false,
247 command: None,
248 chain_steps: vec![],
249 scope: None,
250 max_parallel: None,
251 stagger_delay_ms: None,
252 timeout_secs: None,
253 stall_timeout_secs: None,
254 behavior: Default::default(),
255 item_select_config: None,
256 store_inputs: vec![],
257 store_outputs: vec![],
258 step_vars: None,
259 extra: Default::default(),
260 }],
261 loop_policy: WorkflowLoopSpec {
262 mode: "once".to_string(),
263 max_cycles: None,
264 enabled: true,
265 stop_when_no_unresolved: true,
266 agent_template: None,
267 convergence_expr: None,
268 },
269 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
270 dynamic_steps: vec![],
271 adaptive: None,
272 safety: SafetySpec::default(),
273 max_parallel: None,
274 stagger_delay_ms: None,
275 item_isolation: None,
276 },
277 };
278 let err = wf.validate().expect_err("operation should fail");
279 assert!(err.to_string().contains("type cannot be empty"));
280 }
281
282 #[test]
283 fn workflow_validate_rejects_fixed_without_max_cycles() {
284 let wf = WorkflowResource {
285 metadata: super::super::metadata_with_name("wf-fixed-no-max"),
286 spec: WorkflowSpec {
287 steps: vec![WorkflowStepSpec {
288 id: "qa".to_string(),
289 step_type: "qa".to_string(),
290 required_capability: None,
291 template: None,
292 execution_profile: None,
293 builtin: None,
294 enabled: true,
295 repeatable: false,
296 is_guard: false,
297 cost_preference: None,
298 prehook: None,
299 tty: false,
300 command: None,
301 chain_steps: vec![],
302 scope: None,
303 max_parallel: None,
304 stagger_delay_ms: None,
305 timeout_secs: None,
306 stall_timeout_secs: None,
307 behavior: Default::default(),
308 item_select_config: None,
309 store_inputs: vec![],
310 store_outputs: vec![],
311 step_vars: None,
312 extra: Default::default(),
313 }],
314 loop_policy: WorkflowLoopSpec {
315 mode: "fixed".to_string(),
316 max_cycles: None,
317 enabled: true,
318 stop_when_no_unresolved: true,
319 agent_template: None,
320 convergence_expr: None,
321 },
322 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
323 dynamic_steps: vec![],
324 adaptive: None,
325 safety: SafetySpec::default(),
326 max_parallel: None,
327 stagger_delay_ms: None,
328 item_isolation: None,
329 },
330 };
331 let err = wf.validate().expect_err("operation should fail");
332 assert!(err.to_string().contains("max_cycles > 0"));
333 }
334
335 #[test]
336 fn workflow_validate_rejects_fixed_with_zero_max_cycles() {
337 let wf = WorkflowResource {
338 metadata: super::super::metadata_with_name("wf-fixed-zero"),
339 spec: WorkflowSpec {
340 steps: vec![WorkflowStepSpec {
341 id: "qa".to_string(),
342 step_type: "qa".to_string(),
343 required_capability: None,
344 template: None,
345 execution_profile: None,
346 builtin: None,
347 enabled: true,
348 repeatable: false,
349 is_guard: false,
350 cost_preference: None,
351 prehook: None,
352 tty: false,
353 command: None,
354 chain_steps: vec![],
355 scope: None,
356 max_parallel: None,
357 stagger_delay_ms: None,
358 timeout_secs: None,
359 stall_timeout_secs: None,
360 behavior: Default::default(),
361 item_select_config: None,
362 store_inputs: vec![],
363 store_outputs: vec![],
364 step_vars: None,
365 extra: Default::default(),
366 }],
367 loop_policy: WorkflowLoopSpec {
368 mode: "fixed".to_string(),
369 max_cycles: Some(0),
370 enabled: true,
371 stop_when_no_unresolved: true,
372 agent_template: None,
373 convergence_expr: None,
374 },
375 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
376 dynamic_steps: vec![],
377 adaptive: None,
378 safety: SafetySpec::default(),
379 max_parallel: None,
380 stagger_delay_ms: None,
381 item_isolation: None,
382 },
383 };
384 let err = wf.validate().expect_err("operation should fail");
385 assert!(err.to_string().contains("max_cycles > 0"));
386 }
387
388 #[test]
389 fn workflow_validate_accepts_fixed_with_valid_max_cycles() {
390 let wf = WorkflowResource {
391 metadata: super::super::metadata_with_name("wf-fixed-ok"),
392 spec: WorkflowSpec {
393 steps: vec![WorkflowStepSpec {
394 id: "qa".to_string(),
395 step_type: "qa".to_string(),
396 required_capability: None,
397 template: None,
398 execution_profile: None,
399 builtin: None,
400 enabled: true,
401 repeatable: false,
402 is_guard: false,
403 cost_preference: None,
404 prehook: None,
405 tty: false,
406 command: None,
407 chain_steps: vec![],
408 scope: None,
409 max_parallel: None,
410 stagger_delay_ms: None,
411 timeout_secs: None,
412 stall_timeout_secs: None,
413 behavior: Default::default(),
414 item_select_config: None,
415 store_inputs: vec![],
416 store_outputs: vec![],
417 step_vars: None,
418 extra: Default::default(),
419 }],
420 loop_policy: WorkflowLoopSpec {
421 mode: "fixed".to_string(),
422 max_cycles: Some(3),
423 enabled: true,
424 stop_when_no_unresolved: true,
425 agent_template: None,
426 convergence_expr: None,
427 },
428 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
429 dynamic_steps: vec![],
430 adaptive: None,
431 safety: SafetySpec::default(),
432 max_parallel: None,
433 stagger_delay_ms: None,
434 item_isolation: None,
435 },
436 };
437 assert!(wf.validate().is_ok());
438 }
439
440 #[test]
441 fn workflow_validation_rejects_empty_steps() {
442 let workflow = WorkflowResource {
443 metadata: ResourceMetadata {
444 name: "test-workflow".to_string(),
445 project: None,
446 labels: None,
447 annotations: None,
448 },
449 spec: WorkflowSpec {
450 steps: vec![],
451 loop_policy: WorkflowLoopSpec {
452 mode: "once".to_string(),
453 max_cycles: None,
454 enabled: true,
455 stop_when_no_unresolved: true,
456 agent_template: None,
457 convergence_expr: None,
458 },
459 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
460 dynamic_steps: vec![],
461 adaptive: None,
462 safety: SafetySpec::default(),
463 max_parallel: None,
464 stagger_delay_ms: None,
465 item_isolation: None,
466 },
467 };
468 let result = workflow.validate();
469 assert!(result.is_err());
470 assert!(
471 result
472 .expect_err("operation should fail")
473 .to_string()
474 .contains("cannot be empty")
475 );
476 }
477
478 #[test]
479 fn workflow_get_from_returns_none_for_missing() {
480 let config = make_config();
481 assert!(WorkflowResource::get_from(&config, "nonexistent-wf").is_none());
482 }
483
484 #[test]
485 fn workflow_delete_cleans_up_metadata() {
486 let mut config = make_config();
487 let wf =
488 dispatch_resource(workflow_manifest("meta-wf")).expect("dispatch workflow resource");
489 wf.apply(&mut config).expect("apply");
490 assert!(
491 config
492 .resource_store
493 .get_namespaced("Workflow", crate::config::DEFAULT_PROJECT_ID, "meta-wf")
494 .is_some()
495 );
496
497 WorkflowResource::delete_from(&mut config, "meta-wf");
498 assert!(
499 config
500 .resource_store
501 .get_namespaced("Workflow", crate::config::DEFAULT_PROJECT_ID, "meta-wf")
502 .is_none()
503 );
504 }
505
506 #[test]
507 fn workflow_apply_stores_resource_metadata() {
508 let mut config = make_config();
509 let resource = OrchestratorResource {
510 api_version: API_VERSION.to_string(),
511 kind: ResourceKind::Workflow,
512 metadata: ResourceMetadata {
513 name: "store-meta-wf".to_string(),
514 project: None,
515 labels: Some([("version".to_string(), "v2".to_string())].into()),
516 annotations: None,
517 },
518 spec: ResourceSpec::Workflow(WorkflowSpec {
519 steps: vec![WorkflowStepSpec {
520 id: "qa".to_string(),
521 step_type: "qa".to_string(),
522 required_capability: None,
523 template: None,
524 execution_profile: None,
525 builtin: None,
526 enabled: true,
527 repeatable: false,
528 is_guard: false,
529 cost_preference: None,
530 prehook: None,
531 tty: false,
532 command: None,
533 chain_steps: vec![],
534 scope: None,
535 max_parallel: None,
536 stagger_delay_ms: None,
537 timeout_secs: None,
538 stall_timeout_secs: None,
539 behavior: Default::default(),
540 item_select_config: None,
541 store_inputs: vec![],
542 store_outputs: vec![],
543 step_vars: None,
544 extra: Default::default(),
545 }],
546 loop_policy: WorkflowLoopSpec {
547 mode: "once".to_string(),
548 max_cycles: None,
549 enabled: true,
550 stop_when_no_unresolved: true,
551 agent_template: None,
552 convergence_expr: None,
553 },
554 finalize: crate::cli_types::WorkflowFinalizeSpec { rules: vec![] },
555 dynamic_steps: vec![],
556 adaptive: None,
557 safety: SafetySpec::default(),
558 max_parallel: None,
559 stagger_delay_ms: None,
560 item_isolation: None,
561 }),
562 };
563 let rr = dispatch_resource(resource).expect("dispatch workflow resource");
564 rr.apply(&mut config).expect("apply");
565
566 let cr = config
567 .resource_store
568 .get_namespaced(
569 "Workflow",
570 crate::config::DEFAULT_PROJECT_ID,
571 "store-meta-wf",
572 )
573 .expect("stored workflow CR should exist");
574 assert_eq!(
575 cr.metadata
576 .labels
577 .as_ref()
578 .expect("labels should exist")
579 .get("version")
580 .expect("version label should exist"),
581 "v2"
582 );
583 }
584
585 #[test]
586 fn build_workflow_rejects_wrong_kind() {
587 use crate::cli_types::ProjectSpec;
588 let resource = OrchestratorResource {
589 api_version: API_VERSION.to_string(),
590 kind: ResourceKind::Workflow,
591 metadata: ResourceMetadata {
592 name: "bad".to_string(),
593 project: None,
594 labels: None,
595 annotations: None,
596 },
597 spec: ResourceSpec::Project(ProjectSpec { description: None }),
598 };
599 let err = dispatch_resource(resource).expect_err("operation should fail");
600 assert!(err.to_string().contains("mismatch"));
601 }
602}