Skip to main content

agent_orchestrator/resource/
trigger.rs

1use crate::cli_types::{OrchestratorResource, ResourceKind, ResourceSpec, TriggerSpec};
2use crate::config::{
3    OrchestratorConfig, TriggerActionConfig, TriggerConfig, TriggerCronConfig, TriggerEventConfig,
4    TriggerEventFilterConfig, TriggerFilesystemConfig, TriggerHistoryLimitConfig, TriggerSecretRef,
5    TriggerThrottleConfig, TriggerWebhookConfig,
6};
7use anyhow::{Result, anyhow};
8
9use super::{ApplyResult, RegisteredResource, Resource, ResourceMetadata};
10
11#[derive(Debug, Clone)]
12/// Builtin manifest adapter for `Trigger` resources.
13pub struct TriggerResource {
14    /// Resource metadata from the manifest.
15    pub metadata: ResourceMetadata,
16    /// Manifest spec payload for the trigger.
17    pub spec: TriggerSpec,
18}
19
20impl Resource for TriggerResource {
21    fn kind(&self) -> ResourceKind {
22        ResourceKind::Trigger
23    }
24
25    fn name(&self) -> &str {
26        &self.metadata.name
27    }
28
29    fn validate(&self) -> Result<()> {
30        super::validate_resource_name(self.name())?;
31
32        // Exactly one of cron or event must be set.
33        match (&self.spec.cron, &self.spec.event) {
34            (Some(_), Some(_)) => {
35                return Err(anyhow!(
36                    "trigger '{}': exactly one of 'cron' or 'event' must be set, not both",
37                    self.name()
38                ));
39            }
40            (None, None) => {
41                return Err(anyhow!(
42                    "trigger '{}': exactly one of 'cron' or 'event' must be set",
43                    self.name()
44                ));
45            }
46            _ => {}
47        }
48
49        // Validate cron expression if present.
50        if let Some(ref cron) = self.spec.cron {
51            if cron.schedule.trim().is_empty() {
52                return Err(anyhow!(
53                    "trigger '{}': cron.schedule cannot be empty",
54                    self.name()
55                ));
56            }
57        }
58
59        // Validate event source if present.
60        if let Some(ref event) = self.spec.event {
61            let valid_sources = ["task_completed", "task_failed", "webhook", "filesystem"];
62            if !valid_sources.contains(&event.source.as_str()) {
63                return Err(anyhow!(
64                    "trigger '{}': event.source must be one of {:?}, got '{}'",
65                    self.name(),
66                    valid_sources,
67                    event.source,
68                ));
69            }
70
71            // Filesystem-specific validation.
72            if event.source == "filesystem" {
73                let fs = event.filesystem.as_ref().ok_or_else(|| {
74                    anyhow!(
75                        "trigger '{}': source 'filesystem' requires a 'filesystem' configuration block",
76                        self.name()
77                    )
78                })?;
79                if fs.paths.is_empty() {
80                    return Err(anyhow!(
81                        "trigger '{}': filesystem.paths must not be empty",
82                        self.name()
83                    ));
84                }
85                let valid_events = ["create", "modify", "delete"];
86                for ev in &fs.events {
87                    if !valid_events.contains(&ev.as_str()) {
88                        return Err(anyhow!(
89                            "trigger '{}': filesystem.events must be one of {:?}, got '{}'",
90                            self.name(),
91                            valid_events,
92                            ev,
93                        ));
94                    }
95                }
96                if fs.debounce_ms > 60000 {
97                    return Err(anyhow!(
98                        "trigger '{}': filesystem.debounce_ms must be <= 60000, got {}",
99                        self.name(),
100                        fs.debounce_ms,
101                    ));
102                }
103            }
104        }
105
106        // Action fields must be non-empty.
107        if self.spec.action.workflow.trim().is_empty() {
108            return Err(anyhow!(
109                "trigger '{}': action.workflow cannot be empty",
110                self.name()
111            ));
112        }
113        if self.spec.action.workspace.trim().is_empty() {
114            return Err(anyhow!(
115                "trigger '{}': action.workspace cannot be empty",
116                self.name()
117            ));
118        }
119
120        Ok(())
121    }
122
123    fn apply(&self, config: &mut OrchestratorConfig) -> Result<ApplyResult> {
124        let incoming = to_config(&self.spec);
125        let project = config.ensure_project(self.metadata.project.as_deref());
126        Ok(super::helpers::apply_to_map(
127            &mut project.triggers,
128            self.name(),
129            incoming,
130        ))
131    }
132
133    fn to_yaml(&self) -> Result<String> {
134        super::manifest_yaml(
135            ResourceKind::Trigger,
136            &self.metadata,
137            ResourceSpec::Trigger(self.spec.clone()),
138        )
139    }
140
141    fn get_from_project(
142        config: &OrchestratorConfig,
143        name: &str,
144        project_id: Option<&str>,
145    ) -> Option<Self> {
146        config
147            .project(project_id)?
148            .triggers
149            .get(name)
150            .map(|cfg| Self {
151                metadata: super::metadata_with_name(name),
152                spec: from_config(cfg),
153            })
154    }
155
156    fn delete_from_project(
157        config: &mut OrchestratorConfig,
158        name: &str,
159        project_id: Option<&str>,
160    ) -> bool {
161        config
162            .project_mut(project_id)
163            .map(|project| project.triggers.remove(name).is_some())
164            .unwrap_or(false)
165    }
166}
167
168/// Builds a typed `TriggerResource` from a generic manifest wrapper.
169pub(super) fn build_trigger(resource: OrchestratorResource) -> Result<RegisteredResource> {
170    let OrchestratorResource {
171        kind,
172        metadata,
173        spec,
174        ..
175    } = resource;
176    if kind != ResourceKind::Trigger {
177        return Err(anyhow!("resource kind/spec mismatch for Trigger"));
178    }
179    match spec {
180        ResourceSpec::Trigger(spec) => Ok(RegisteredResource::Trigger(TriggerResource {
181            metadata,
182            spec,
183        })),
184        _ => Err(anyhow!("resource kind/spec mismatch for Trigger")),
185    }
186}
187
188// ── Spec ↔ Config conversions ────────────────────────────────────────────────
189
190fn to_config(spec: &TriggerSpec) -> TriggerConfig {
191    TriggerConfig {
192        cron: spec.cron.as_ref().map(|c| TriggerCronConfig {
193            schedule: c.schedule.clone(),
194            timezone: c.timezone.clone(),
195        }),
196        event: spec.event.as_ref().map(|e| TriggerEventConfig {
197            source: e.source.clone(),
198            filter: e.filter.as_ref().map(|f| TriggerEventFilterConfig {
199                workflow: f.workflow.clone(),
200                condition: f.condition.clone(),
201            }),
202            webhook: e.webhook.as_ref().map(|w| TriggerWebhookConfig {
203                secret: w.secret.as_ref().map(|s| TriggerSecretRef {
204                    from_ref: s.from_ref.clone(),
205                }),
206                signature_header: w.signature_header.clone(),
207                crd_ref: w.crd_ref.clone(),
208            }),
209            filesystem: e.filesystem.as_ref().map(|fs| TriggerFilesystemConfig {
210                paths: fs.paths.clone(),
211                events: fs.events.clone(),
212                debounce_ms: fs.debounce_ms,
213            }),
214        }),
215        action: TriggerActionConfig {
216            workflow: spec.action.workflow.clone(),
217            workspace: spec.action.workspace.clone(),
218            args: spec.action.args.clone(),
219            start: spec.action.start,
220        },
221        concurrency_policy: spec.concurrency_policy,
222        suspend: spec.suspend,
223        history_limit: spec
224            .history_limit
225            .as_ref()
226            .map(|h| TriggerHistoryLimitConfig {
227                successful: h.successful,
228                failed: h.failed,
229            }),
230        throttle: spec.throttle.as_ref().map(|t| TriggerThrottleConfig {
231            min_interval: t.min_interval,
232        }),
233    }
234}
235
236fn from_config(cfg: &TriggerConfig) -> TriggerSpec {
237    use crate::cli_types::{
238        TriggerActionSpec, TriggerCronSpec, TriggerEventFilter, TriggerEventSpec,
239        TriggerFilesystemSpec, TriggerHistoryLimit, TriggerThrottleSpec, TriggerWebhookSpec,
240        WebhookSecretRef,
241    };
242
243    TriggerSpec {
244        cron: cfg.cron.as_ref().map(|c| TriggerCronSpec {
245            schedule: c.schedule.clone(),
246            timezone: c.timezone.clone(),
247        }),
248        event: cfg.event.as_ref().map(|e| TriggerEventSpec {
249            source: e.source.clone(),
250            filter: e.filter.as_ref().map(|f| TriggerEventFilter {
251                workflow: f.workflow.clone(),
252                condition: f.condition.clone(),
253            }),
254            webhook: e.webhook.as_ref().map(|w| TriggerWebhookSpec {
255                secret: w.secret.as_ref().map(|s| WebhookSecretRef {
256                    from_ref: s.from_ref.clone(),
257                }),
258                signature_header: w.signature_header.clone(),
259                crd_ref: w.crd_ref.clone(),
260            }),
261            filesystem: e.filesystem.as_ref().map(|fs| TriggerFilesystemSpec {
262                paths: fs.paths.clone(),
263                events: fs.events.clone(),
264                debounce_ms: fs.debounce_ms,
265            }),
266        }),
267        action: TriggerActionSpec {
268            workflow: cfg.action.workflow.clone(),
269            workspace: cfg.action.workspace.clone(),
270            args: cfg.action.args.clone(),
271            start: cfg.action.start,
272        },
273        concurrency_policy: cfg.concurrency_policy,
274        suspend: cfg.suspend,
275        history_limit: cfg.history_limit.as_ref().map(|h| TriggerHistoryLimit {
276            successful: h.successful,
277            failed: h.failed,
278        }),
279        throttle: cfg.throttle.as_ref().map(|t| TriggerThrottleSpec {
280            min_interval: t.min_interval,
281        }),
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use crate::cli_types::OrchestratorResource;
289    use crate::resource::dispatch_resource;
290
291    use super::super::test_fixtures::make_config;
292
293    fn trigger_cron_manifest(name: &str, schedule: &str) -> OrchestratorResource {
294        let yaml = format!(
295            r#"
296apiVersion: orchestrator.dev/v2
297kind: Trigger
298metadata:
299  name: {name}
300spec:
301  cron:
302    schedule: "{schedule}"
303  action:
304    workflow: test-wf
305    workspace: test-ws
306"#,
307        );
308        serde_yaml::from_str(&yaml).expect("should parse trigger YAML")
309    }
310
311    fn trigger_event_manifest(name: &str, source: &str) -> OrchestratorResource {
312        let yaml = format!(
313            r#"
314apiVersion: orchestrator.dev/v2
315kind: Trigger
316metadata:
317  name: {name}
318spec:
319  event:
320    source: {source}
321    filter:
322      workflow: my-wf
323      condition: "status == 'completed'"
324  action:
325    workflow: deploy
326    workspace: main
327  concurrencyPolicy: Replace
328"#,
329        );
330        serde_yaml::from_str(&yaml).expect("should parse trigger event YAML")
331    }
332
333    #[test]
334    fn trigger_dispatch_and_kind() {
335        let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
336            .expect("dispatch should succeed");
337        assert_eq!(resource.kind(), ResourceKind::Trigger);
338        assert_eq!(resource.name(), "nightly");
339    }
340
341    #[test]
342    fn trigger_validate_accepts_valid_cron() {
343        let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
344            .expect("dispatch should succeed");
345        assert!(resource.validate().is_ok());
346    }
347
348    #[test]
349    fn trigger_validate_accepts_valid_event() {
350        let resource = dispatch_resource(trigger_event_manifest("on-complete", "task_completed"))
351            .expect("dispatch should succeed");
352        assert!(resource.validate().is_ok());
353    }
354
355    #[test]
356    fn trigger_validate_rejects_empty_name() {
357        let resource = dispatch_resource(trigger_cron_manifest("", "0 2 * * *"))
358            .expect("dispatch should succeed");
359        assert!(resource.validate().is_err());
360    }
361
362    #[test]
363    fn trigger_validate_rejects_both_cron_and_event() {
364        let yaml = r#"
365apiVersion: orchestrator.dev/v2
366kind: Trigger
367metadata:
368  name: bad
369spec:
370  cron:
371    schedule: "0 2 * * *"
372  event:
373    source: task_completed
374  action:
375    workflow: wf
376    workspace: ws
377"#;
378        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
379        let registered = dispatch_resource(resource).expect("dispatch");
380        let err = registered.validate().expect_err("should reject both");
381        assert!(err.to_string().contains("not both"));
382    }
383
384    #[test]
385    fn trigger_validate_rejects_neither_cron_nor_event() {
386        let yaml = r#"
387apiVersion: orchestrator.dev/v2
388kind: Trigger
389metadata:
390  name: bad
391spec:
392  action:
393    workflow: wf
394    workspace: ws
395"#;
396        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
397        let registered = dispatch_resource(resource).expect("dispatch");
398        let err = registered.validate().expect_err("should reject neither");
399        assert!(err.to_string().contains("must be set"));
400    }
401
402    #[test]
403    fn trigger_validate_rejects_invalid_event_source() {
404        let resource = dispatch_resource(trigger_event_manifest("bad", "invalid_source"))
405            .expect("dispatch should succeed");
406        let err = resource.validate().expect_err("should reject");
407        assert!(err.to_string().contains("event.source must be one of"));
408    }
409
410    #[test]
411    fn trigger_apply_created_then_unchanged() {
412        let mut config = make_config();
413        let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
414            .expect("dispatch should succeed");
415        assert_eq!(
416            resource.apply(&mut config).expect("apply"),
417            ApplyResult::Created
418        );
419        assert_eq!(
420            resource.apply(&mut config).expect("apply"),
421            ApplyResult::Unchanged
422        );
423    }
424
425    #[test]
426    fn trigger_get_from_and_delete_from() {
427        let mut config = make_config();
428        let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
429            .expect("dispatch should succeed");
430        resource.apply(&mut config).expect("apply");
431
432        let loaded = TriggerResource::get_from(&config, "nightly");
433        assert!(loaded.is_some());
434
435        assert!(TriggerResource::delete_from(&mut config, "nightly"));
436        assert!(TriggerResource::get_from(&config, "nightly").is_none());
437    }
438
439    #[test]
440    fn trigger_to_yaml() {
441        let resource = dispatch_resource(trigger_cron_manifest("nightly", "0 2 * * *"))
442            .expect("dispatch should succeed");
443        let yaml = resource.to_yaml().expect("should serialize");
444        assert!(yaml.contains("kind: Trigger"));
445        assert!(yaml.contains("nightly"));
446    }
447
448    #[test]
449    fn trigger_yaml_roundtrip_cron() {
450        let yaml = r#"
451apiVersion: orchestrator.dev/v2
452kind: Trigger
453metadata:
454  name: nightly-qa
455spec:
456  cron:
457    schedule: "0 2 * * *"
458    timezone: Asia/Shanghai
459  action:
460    workflow: full-qa
461    workspace: main-workspace
462  concurrencyPolicy: Forbid
463  suspend: false
464  historyLimit:
465    successful: 3
466    failed: 3
467"#;
468        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
469        resource
470            .validate_version()
471            .expect("version should be valid");
472        assert_eq!(resource.kind, ResourceKind::Trigger);
473        if let ResourceSpec::Trigger(ref spec) = resource.spec {
474            assert!(spec.cron.is_some());
475            assert!(spec.event.is_none());
476            assert_eq!(spec.cron.as_ref().unwrap().schedule, "0 2 * * *");
477            assert_eq!(
478                spec.cron.as_ref().unwrap().timezone.as_deref(),
479                Some("Asia/Shanghai")
480            );
481            assert_eq!(spec.action.workflow, "full-qa");
482            assert_eq!(spec.action.workspace, "main-workspace");
483            assert!(spec.action.start); // default true
484        } else {
485            panic!("expected Trigger spec");
486        }
487    }
488
489    #[test]
490    fn trigger_yaml_roundtrip_event() {
491        let yaml = r#"
492apiVersion: orchestrator.dev/v2
493kind: Trigger
494metadata:
495  name: auto-deploy
496spec:
497  event:
498    source: task_completed
499    filter:
500      workflow: full-qa
501      condition: "status == 'completed' && unresolved_items == 0"
502  action:
503    workflow: deploy-staging
504    workspace: main-workspace
505  concurrencyPolicy: Replace
506  throttle:
507    minInterval: 300
508"#;
509        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
510        assert_eq!(resource.kind, ResourceKind::Trigger);
511        if let ResourceSpec::Trigger(ref spec) = resource.spec {
512            assert!(spec.event.is_some());
513            assert!(spec.cron.is_none());
514            let event = spec.event.as_ref().unwrap();
515            assert_eq!(event.source, "task_completed");
516            assert_eq!(
517                event.filter.as_ref().unwrap().workflow.as_deref(),
518                Some("full-qa")
519            );
520            assert_eq!(
521                spec.concurrency_policy,
522                crate::cli_types::ConcurrencyPolicy::Replace
523            );
524            assert_eq!(spec.throttle.as_ref().unwrap().min_interval, 300);
525        } else {
526            panic!("expected Trigger spec");
527        }
528    }
529
530    #[test]
531    fn trigger_validate_accepts_filesystem_source() {
532        let yaml = r#"
533apiVersion: orchestrator.dev/v2
534kind: Trigger
535metadata:
536  name: fr-watch
537spec:
538  event:
539    source: filesystem
540    filesystem:
541      paths:
542        - docs/feature_request/
543      events:
544        - create
545      debounce_ms: 500
546    filter:
547      condition: "payload_filename.matches('^FR-.*\\.md$')"
548  action:
549    workflow: fr-governance
550    workspace: default
551"#;
552        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
553        let registered = dispatch_resource(resource).expect("dispatch");
554        assert!(registered.validate().is_ok());
555    }
556
557    #[test]
558    fn trigger_validate_filesystem_requires_paths() {
559        let yaml = r#"
560apiVersion: orchestrator.dev/v2
561kind: Trigger
562metadata:
563  name: bad-fs
564spec:
565  event:
566    source: filesystem
567    filesystem:
568      paths: []
569  action:
570    workflow: wf
571    workspace: ws
572"#;
573        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
574        let registered = dispatch_resource(resource).expect("dispatch");
575        let err = registered
576            .validate()
577            .expect_err("should reject empty paths");
578        assert!(err.to_string().contains("paths must not be empty"));
579    }
580
581    #[test]
582    fn trigger_validate_filesystem_requires_block() {
583        let yaml = r#"
584apiVersion: orchestrator.dev/v2
585kind: Trigger
586metadata:
587  name: bad-fs
588spec:
589  event:
590    source: filesystem
591  action:
592    workflow: wf
593    workspace: ws
594"#;
595        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
596        let registered = dispatch_resource(resource).expect("dispatch");
597        let err = registered
598            .validate()
599            .expect_err("should reject missing filesystem");
600        assert!(
601            err.to_string()
602                .contains("requires a 'filesystem' configuration block")
603        );
604    }
605
606    #[test]
607    fn trigger_validate_filesystem_rejects_invalid_events() {
608        let yaml = r#"
609apiVersion: orchestrator.dev/v2
610kind: Trigger
611metadata:
612  name: bad-fs
613spec:
614  event:
615    source: filesystem
616    filesystem:
617      paths:
618        - src/
619      events:
620        - invalid_event
621  action:
622    workflow: wf
623    workspace: ws
624"#;
625        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
626        let registered = dispatch_resource(resource).expect("dispatch");
627        let err = registered
628            .validate()
629            .expect_err("should reject invalid events");
630        assert!(err.to_string().contains("filesystem.events must be one of"));
631    }
632
633    #[test]
634    fn trigger_yaml_roundtrip_filesystem() {
635        let yaml = r#"
636apiVersion: orchestrator.dev/v2
637kind: Trigger
638metadata:
639  name: fr-watch
640spec:
641  event:
642    source: filesystem
643    filesystem:
644      paths:
645        - docs/feature_request/
646      events:
647        - create
648      debounce_ms: 1000
649  action:
650    workflow: fr-governance
651    workspace: default
652  concurrencyPolicy: Forbid
653"#;
654        let resource: OrchestratorResource = serde_yaml::from_str(yaml).expect("should parse YAML");
655        assert_eq!(resource.kind, ResourceKind::Trigger);
656        if let ResourceSpec::Trigger(ref spec) = resource.spec {
657            let event = spec.event.as_ref().unwrap();
658            assert_eq!(event.source, "filesystem");
659            let fs = event.filesystem.as_ref().unwrap();
660            assert_eq!(fs.paths, vec!["docs/feature_request/"]);
661            assert_eq!(fs.events, vec!["create"]);
662            assert_eq!(fs.debounce_ms, 1000);
663        } else {
664            panic!("expected Trigger spec");
665        }
666    }
667}