1use crate::cli_types::{OrchestratorResource, ResourceKind, ResourceSpec, TriggerSpec};
2use crate::config::{
3 OrchestratorConfig, TriggerActionConfig, TriggerConfig, TriggerCronConfig, TriggerEventConfig,
4 TriggerEventFilterConfig, TriggerFilesystemConfig, TriggerHistoryLimitConfig, TriggerSecretRef,
5 TriggerThrottleConfig, TriggerWebhookConfig,
6};
7use anyhow::{Result, anyhow};
8
9use super::{ApplyResult, RegisteredResource, Resource, ResourceMetadata};
10
11#[derive(Debug, Clone)]
12pub struct TriggerResource {
14 pub metadata: ResourceMetadata,
16 pub spec: TriggerSpec,
18}
19
20impl Resource for TriggerResource {
21 fn kind(&self) -> ResourceKind {
22 ResourceKind::Trigger
23 }
24
25 fn name(&self) -> &str {
26 &self.metadata.name
27 }
28
29 fn validate(&self) -> Result<()> {
30 super::validate_resource_name(self.name())?;
31
32 match (&self.spec.cron, &self.spec.event) {
34 (Some(_), Some(_)) => {
35 return Err(anyhow!(
36 "trigger '{}': exactly one of 'cron' or 'event' must be set, not both",
37 self.name()
38 ));
39 }
40 (None, None) => {
41 return Err(anyhow!(
42 "trigger '{}': exactly one of 'cron' or 'event' must be set",
43 self.name()
44 ));
45 }
46 _ => {}
47 }
48
49 if let Some(ref cron) = self.spec.cron {
51 if cron.schedule.trim().is_empty() {
52 return Err(anyhow!(
53 "trigger '{}': cron.schedule cannot be empty",
54 self.name()
55 ));
56 }
57 }
58
59 if let Some(ref event) = self.spec.event {
61 let valid_sources = ["task_completed", "task_failed", "webhook", "filesystem"];
62 if !valid_sources.contains(&event.source.as_str()) {
63 return Err(anyhow!(
64 "trigger '{}': event.source must be one of {:?}, got '{}'",
65 self.name(),
66 valid_sources,
67 event.source,
68 ));
69 }
70
71 if event.source == "filesystem" {
73 let fs = event.filesystem.as_ref().ok_or_else(|| {
74 anyhow!(
75 "trigger '{}': source 'filesystem' requires a 'filesystem' configuration block",
76 self.name()
77 )
78 })?;
79 if fs.paths.is_empty() {
80 return Err(anyhow!(
81 "trigger '{}': filesystem.paths must not be empty",
82 self.name()
83 ));
84 }
85 let valid_events = ["create", "modify", "delete"];
86 for ev in &fs.events {
87 if !valid_events.contains(&ev.as_str()) {
88 return Err(anyhow!(
89 "trigger '{}': filesystem.events must be one of {:?}, got '{}'",
90 self.name(),
91 valid_events,
92 ev,
93 ));
94 }
95 }
96 if fs.debounce_ms > 60000 {
97 return Err(anyhow!(
98 "trigger '{}': filesystem.debounce_ms must be <= 60000, got {}",
99 self.name(),
100 fs.debounce_ms,
101 ));
102 }
103 }
104 }
105
106 if self.spec.action.workflow.trim().is_empty() {
108 return Err(anyhow!(
109 "trigger '{}': action.workflow cannot be empty",
110 self.name()
111 ));
112 }
113 if self.spec.action.workspace.trim().is_empty() {
114 return Err(anyhow!(
115 "trigger '{}': action.workspace cannot be empty",
116 self.name()
117 ));
118 }
119
120 Ok(())
121 }
122
123 fn apply(&self, config: &mut OrchestratorConfig) -> Result<ApplyResult> {
124 let incoming = to_config(&self.spec);
125 let project = config.ensure_project(self.metadata.project.as_deref());
126 Ok(super::helpers::apply_to_map(
127 &mut project.triggers,
128 self.name(),
129 incoming,
130 ))
131 }
132
133 fn to_yaml(&self) -> Result<String> {
134 super::manifest_yaml(
135 ResourceKind::Trigger,
136 &self.metadata,
137 ResourceSpec::Trigger(self.spec.clone()),
138 )
139 }
140
141 fn get_from_project(
142 config: &OrchestratorConfig,
143 name: &str,
144 project_id: Option<&str>,
145 ) -> Option<Self> {
146 config
147 .project(project_id)?
148 .triggers
149 .get(name)
150 .map(|cfg| Self {
151 metadata: super::metadata_with_name(name),
152 spec: from_config(cfg),
153 })
154 }
155
156 fn delete_from_project(
157 config: &mut OrchestratorConfig,
158 name: &str,
159 project_id: Option<&str>,
160 ) -> bool {
161 config
162 .project_mut(project_id)
163 .map(|project| project.triggers.remove(name).is_some())
164 .unwrap_or(false)
165 }
166}
167
168pub(super) fn build_trigger(resource: OrchestratorResource) -> Result<RegisteredResource> {
170 let OrchestratorResource {
171 kind,
172 metadata,
173 spec,
174 ..
175 } = resource;
176 if kind != ResourceKind::Trigger {
177 return Err(anyhow!("resource kind/spec mismatch for Trigger"));
178 }
179 match spec {
180 ResourceSpec::Trigger(spec) => Ok(RegisteredResource::Trigger(TriggerResource {
181 metadata,
182 spec,
183 })),
184 _ => Err(anyhow!("resource kind/spec mismatch for Trigger")),
185 }
186}
187
188fn to_config(spec: &TriggerSpec) -> TriggerConfig {
191 TriggerConfig {
192 cron: spec.cron.as_ref().map(|c| TriggerCronConfig {
193 schedule: c.schedule.clone(),
194 timezone: c.timezone.clone(),
195 }),
196 event: spec.event.as_ref().map(|e| TriggerEventConfig {
197 source: e.source.clone(),
198 filter: e.filter.as_ref().map(|f| TriggerEventFilterConfig {
199 workflow: f.workflow.clone(),
200 condition: f.condition.clone(),
201 }),
202 webhook: e.webhook.as_ref().map(|w| TriggerWebhookConfig {
203 secret: w.secret.as_ref().map(|s| TriggerSecretRef {
204 from_ref: s.from_ref.clone(),
205 }),
206 signature_header: w.signature_header.clone(),
207 crd_ref: w.crd_ref.clone(),
208 }),
209 filesystem: e.filesystem.as_ref().map(|fs| TriggerFilesystemConfig {
210 paths: fs.paths.clone(),
211 events: fs.events.clone(),
212 debounce_ms: fs.debounce_ms,
213 }),
214 }),
215 action: TriggerActionConfig {
216 workflow: spec.action.workflow.clone(),
217 workspace: spec.action.workspace.clone(),
218 args: spec.action.args.clone(),
219 start: spec.action.start,
220 },
221 concurrency_policy: spec.concurrency_policy,
222 suspend: spec.suspend,
223 history_limit: spec
224 .history_limit
225 .as_ref()
226 .map(|h| TriggerHistoryLimitConfig {
227 successful: h.successful,
228 failed: h.failed,
229 }),
230 throttle: spec.throttle.as_ref().map(|t| TriggerThrottleConfig {
231 min_interval: t.min_interval,
232 }),
233 }
234}
235
236fn from_config(cfg: &TriggerConfig) -> TriggerSpec {
237 use crate::cli_types::{
238 TriggerActionSpec, TriggerCronSpec, TriggerEventFilter, TriggerEventSpec,
239 TriggerFilesystemSpec, TriggerHistoryLimit, TriggerThrottleSpec, TriggerWebhookSpec,
240 WebhookSecretRef,
241 };
242
243 TriggerSpec {
244 cron: cfg.cron.as_ref().map(|c| TriggerCronSpec {
245 schedule: c.schedule.clone(),
246 timezone: c.timezone.clone(),
247 }),
248 event: cfg.event.as_ref().map(|e| TriggerEventSpec {
249 source: e.source.clone(),
250 filter: e.filter.as_ref().map(|f| TriggerEventFilter {
251 workflow: f.workflow.clone(),
252 condition: f.condition.clone(),
253 }),
254 webhook: e.webhook.as_ref().map(|w| TriggerWebhookSpec {
255 secret: w.secret.as_ref().map(|s| WebhookSecretRef {
256 from_ref: s.from_ref.clone(),
257 }),
258 signature_header: w.signature_header.clone(),
259 crd_ref: w.crd_ref.clone(),
260 }),
261 filesystem: e.filesystem.as_ref().map(|fs| TriggerFilesystemSpec {
262 paths: fs.paths.clone(),
263 events: fs.events.clone(),
264 debounce_ms: fs.debounce_ms,
265 }),
266 }),
267 action: TriggerActionSpec {
268 workflow: cfg.action.workflow.clone(),
269 workspace: cfg.action.workspace.clone(),
270 args: cfg.action.args.clone(),
271 start: cfg.action.start,
272 },
273 concurrency_policy: cfg.concurrency_policy,
274 suspend: cfg.suspend,
275 history_limit: cfg.history_limit.as_ref().map(|h| TriggerHistoryLimit {
276 successful: h.successful,
277 failed: h.failed,
278 }),
279 throttle: cfg.throttle.as_ref().map(|t| TriggerThrottleSpec {
280 min_interval: t.min_interval,
281 }),
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use crate::cli_types::OrchestratorResource;
289 use crate::resource::dispatch_resource;
290
291 use super::super::test_fixtures::make_config;
292
293 fn trigger_cron_manifest(name: &str, schedule: &str) -> OrchestratorResource {
294 let yaml = format!(
295 r#"
296apiVersion: orchestrator.dev/v2
297kind: Trigger
298metadata:
299 name: {name}
300spec:
301 cron:
302 schedule: "{schedule}"
303 action:
304 workflow: test-wf
305 workspace: test-ws
306"#,
307 );
308 serde_yaml::from_str(&yaml).expect("should parse trigger YAML")
309 }
310
311 fn trigger_event_manifest(name: &str, source: &str) -> OrchestratorResource {
312 let yaml = format!(
313 r#"
314apiVersion: orchestrator.dev/v2
315kind: Trigger
316metadata:
317 name: {name}
318spec:
319 event:
320 source: {source}
321 filter:
322 workflow: my-wf
323 condition: "status == 'completed'"
324 action:
325 workflow: deploy
326 workspace: main
327 concurrencyPolicy: Replace
328"#,
329 );
330 serde_yaml::from_str(&yaml).expect("should parse trigger event YAML")
331 }
332
333 #[test]
334 fn trigger_dispatch_and_kind() {
335 let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
336 .expect("dispatch should succeed");
337 assert_eq!(resource.kind(), ResourceKind::Trigger);
338 assert_eq!(resource.name(), "nightly");
339 }
340
341 #[test]
342 fn trigger_validate_accepts_valid_cron() {
343 let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
344 .expect("dispatch should succeed");
345 assert!(resource.validate().is_ok());
346 }
347
348 #[test]
349 fn trigger_validate_accepts_valid_event() {
350 let resource = dispatch_resource(trigger_event_manifest("on-complete", "task_completed"))
351 .expect("dispatch should succeed");
352 assert!(resource.validate().is_ok());
353 }
354
355 #[test]
356 fn trigger_validate_rejects_empty_name() {
357 let resource = dispatch_resource(trigger_cron_manifest("", "0 2 * * *"))
358 .expect("dispatch should succeed");
359 assert!(resource.validate().is_err());
360 }
361
362 #[test]
363 fn trigger_validate_rejects_both_cron_and_event() {
364 let yaml = r#"
365apiVersion: orchestrator.dev/v2
366kind: Trigger
367metadata:
368 name: bad
369spec:
370 cron:
371 schedule: "0 2 * * *"
372 event:
373 source: task_completed
374 action:
375 workflow: wf
376 workspace: ws
377"#;
378 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
379 let registered = dispatch_resource(resource).expect("dispatch");
380 let err = registered.validate().expect_err("should reject both");
381 assert!(err.to_string().contains("not both"));
382 }
383
384 #[test]
385 fn trigger_validate_rejects_neither_cron_nor_event() {
386 let yaml = r#"
387apiVersion: orchestrator.dev/v2
388kind: Trigger
389metadata:
390 name: bad
391spec:
392 action:
393 workflow: wf
394 workspace: ws
395"#;
396 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
397 let registered = dispatch_resource(resource).expect("dispatch");
398 let err = registered.validate().expect_err("should reject neither");
399 assert!(err.to_string().contains("must be set"));
400 }
401
402 #[test]
403 fn trigger_validate_rejects_invalid_event_source() {
404 let resource = dispatch_resource(trigger_event_manifest("bad", "invalid_source"))
405 .expect("dispatch should succeed");
406 let err = resource.validate().expect_err("should reject");
407 assert!(err.to_string().contains("event.source must be one of"));
408 }
409
410 #[test]
411 fn trigger_apply_created_then_unchanged() {
412 let mut config = make_config();
413 let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
414 .expect("dispatch should succeed");
415 assert_eq!(
416 resource.apply(&mut config).expect("apply"),
417 ApplyResult::Created
418 );
419 assert_eq!(
420 resource.apply(&mut config).expect("apply"),
421 ApplyResult::Unchanged
422 );
423 }
424
425 #[test]
426 fn trigger_get_from_and_delete_from() {
427 let mut config = make_config();
428 let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
429 .expect("dispatch should succeed");
430 resource.apply(&mut config).expect("apply");
431
432 let loaded = TriggerResource::get_from(&config, "nightly");
433 assert!(loaded.is_some());
434
435 assert!(TriggerResource::delete_from(&mut config, "nightly"));
436 assert!(TriggerResource::get_from(&config, "nightly").is_none());
437 }
438
439 #[test]
440 fn trigger_to_yaml() {
441 let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
442 .expect("dispatch should succeed");
443 let yaml = resource.to_yaml().expect("should serialize");
444 assert!(yaml.contains("kind: Trigger"));
445 assert!(yaml.contains("nightly"));
446 }
447
448 #[test]
449 fn trigger_yaml_roundtrip_cron() {
450 let yaml = r#"
451apiVersion: orchestrator.dev/v2
452kind: Trigger
453metadata:
454 name: nightly-qa
455spec:
456 cron:
457 schedule: "0 2 * * *"
458 timezone: Asia/Shanghai
459 action:
460 workflow: full-qa
461 workspace: main-workspace
462 concurrencyPolicy: Forbid
463 suspend: false
464 historyLimit:
465 successful: 3
466 failed: 3
467"#;
468 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
469 resource
470 .validate_version()
471 .expect("version should be valid");
472 assert_eq!(resource.kind, ResourceKind::Trigger);
473 if let ResourceSpec::Trigger(ref spec) = resource.spec {
474 assert!(spec.cron.is_some());
475 assert!(spec.event.is_none());
476 assert_eq!(spec.cron.as_ref().unwrap().schedule, "0 2 * * *");
477 assert_eq!(
478 spec.cron.as_ref().unwrap().timezone.as_deref(),
479 Some("Asia/Shanghai")
480 );
481 assert_eq!(spec.action.workflow, "full-qa");
482 assert_eq!(spec.action.workspace, "main-workspace");
483 assert!(spec.action.start); } else {
485 panic!("expected Trigger spec");
486 }
487 }
488
489 #[test]
490 fn trigger_yaml_roundtrip_event() {
491 let yaml = r#"
492apiVersion: orchestrator.dev/v2
493kind: Trigger
494metadata:
495 name: auto-deploy
496spec:
497 event:
498 source: task_completed
499 filter:
500 workflow: full-qa
501 condition: "status == 'completed' && unresolved_items == 0"
502 action:
503 workflow: deploy-staging
504 workspace: main-workspace
505 concurrencyPolicy: Replace
506 throttle:
507 minInterval: 300
508"#;
509 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
510 assert_eq!(resource.kind, ResourceKind::Trigger);
511 if let ResourceSpec::Trigger(ref spec) = resource.spec {
512 assert!(spec.event.is_some());
513 assert!(spec.cron.is_none());
514 let event = spec.event.as_ref().unwrap();
515 assert_eq!(event.source, "task_completed");
516 assert_eq!(
517 event.filter.as_ref().unwrap().workflow.as_deref(),
518 Some("full-qa")
519 );
520 assert_eq!(
521 spec.concurrency_policy,
522 crate::cli_types::ConcurrencyPolicy::Replace
523 );
524 assert_eq!(spec.throttle.as_ref().unwrap().min_interval, 300);
525 } else {
526 panic!("expected Trigger spec");
527 }
528 }
529
530 #[test]
531 fn trigger_validate_accepts_filesystem_source() {
532 let yaml = r#"
533apiVersion: orchestrator.dev/v2
534kind: Trigger
535metadata:
536 name: fr-watch
537spec:
538 event:
539 source: filesystem
540 filesystem:
541 paths:
542 - docs/feature_request/
543 events:
544 - create
545 debounce_ms: 500
546 filter:
547 condition: "payload_filename.matches('^FR-.*\\.md$')"
548 action:
549 workflow: fr-governance
550 workspace: default
551"#;
552 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
553 let registered = dispatch_resource(resource).expect("dispatch");
554 assert!(registered.validate().is_ok());
555 }
556
557 #[test]
558 fn trigger_validate_filesystem_requires_paths() {
559 let yaml = r#"
560apiVersion: orchestrator.dev/v2
561kind: Trigger
562metadata:
563 name: bad-fs
564spec:
565 event:
566 source: filesystem
567 filesystem:
568 paths: []
569 action:
570 workflow: wf
571 workspace: ws
572"#;
573 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
574 let registered = dispatch_resource(resource).expect("dispatch");
575 let err = registered
576 .validate()
577 .expect_err("should reject empty paths");
578 assert!(err.to_string().contains("paths must not be empty"));
579 }
580
581 #[test]
582 fn trigger_validate_filesystem_requires_block() {
583 let yaml = r#"
584apiVersion: orchestrator.dev/v2
585kind: Trigger
586metadata:
587 name: bad-fs
588spec:
589 event:
590 source: filesystem
591 action:
592 workflow: wf
593 workspace: ws
594"#;
595 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
596 let registered = dispatch_resource(resource).expect("dispatch");
597 let err = registered
598 .validate()
599 .expect_err("should reject missing filesystem");
600 assert!(
601 err.to_string()
602 .contains("requires a 'filesystem' configuration block")
603 );
604 }
605
606 #[test]
607 fn trigger_validate_filesystem_rejects_invalid_events() {
608 let yaml = r#"
609apiVersion: orchestrator.dev/v2
610kind: Trigger
611metadata:
612 name: bad-fs
613spec:
614 event:
615 source: filesystem
616 filesystem:
617 paths:
618 - src/
619 events:
620 - invalid_event
621 action:
622 workflow: wf
623 workspace: ws
624"#;
625 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
626 let registered = dispatch_resource(resource).expect("dispatch");
627 let err = registered
628 .validate()
629 .expect_err("should reject invalid events");
630 assert!(err.to_string().contains("filesystem.events must be one of"));
631 }
632
633 #[test]
634 fn trigger_yaml_roundtrip_filesystem() {
635 let yaml = r#"
636apiVersion: orchestrator.dev/v2
637kind: Trigger
638metadata:
639 name: fr-watch
640spec:
641 event:
642 source: filesystem
643 filesystem:
644 paths:
645 - docs/feature_request/
646 events:
647 - create
648 debounce_ms: 1000
649 action:
650 workflow: fr-governance
651 workspace: default
652 concurrencyPolicy: Forbid
653"#;
654 let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
655 assert_eq!(resource.kind, ResourceKind::Trigger);
656 if let ResourceSpec::Trigger(ref spec) = resource.spec {
657 let event = spec.event.as_ref().unwrap();
658 assert_eq!(event.source, "filesystem");
659 let fs = event.filesystem.as_ref().unwrap();
660 assert_eq!(fs.paths, vec!["docs/feature_request/"]);
661 assert_eq!(fs.events, vec!["create"]);
662 assert_eq!(fs.debounce_ms, 1000);
663 } else {
664 panic!("expected Trigger spec");
665 }
666 }
667}