Skip to main content

distri_types/
workflow_triggers.rs

1//! Unified workflow trigger taxonomy.
2//!
3//! Replaces two earlier trigger types that disagreed about shape and
4//! scope (`configuration::Trigger` and `channel_commands::ChannelTrigger`
5//! — both now deleted). [`WorkflowTrigger`] folds channel triggers
6//! (`Slash`/`Callback`/`Message`), agent-level triggers
7//! (`Manual`/`Schedule`), and the spec's new variants — `Webhook`,
8//! `Event`, `Tool` (workflow exposed as an A2A skill) — into one
9//! enum **attached to entry points**. Every trigger either starts a
10//! new run or resumes a parked one (event-correlated-by-task —
11//! handled through `WorkflowStore.wait_task_id`).
12
13use crate::channels::ChannelProvider;
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use utoipa::ToSchema;
17
18fn default_true() -> bool {
19    true
20}
21
22/// How a workflow run is reached. Lives on an `EntryPoint` (see
23/// `distri-workflow`).
24#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema, JsonSchema)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum WorkflowTrigger {
27    /// Direct API / UI invocation. The implicit default when no
28    /// triggers are declared on an entry point.
29    Manual,
30
31    /// Cron-based scheduled execution.
32    Schedule {
33        /// Cron expression, e.g. "0 * * * *" (every hour).
34        cron: String,
35        /// IANA timezone, e.g. "America/Los_Angeles". Defaults to UTC.
36        #[serde(default, skip_serializing_if = "Option::is_none")]
37        timezone: Option<String>,
38        #[serde(default = "default_true")]
39        enabled: bool,
40        /// Default input passed to the workflow on each scheduled run.
41        #[serde(default, skip_serializing_if = "Option::is_none")]
42        input: Option<serde_json::Value>,
43    },
44
45    /// A channel slash command. `args` names positional params.
46    Slash {
47        name: String,
48        #[serde(default)]
49        aliases: Vec<String>,
50        /// Restrict to these providers; empty = all.
51        #[serde(default)]
52        channels: Vec<ChannelProvider>,
53        #[serde(default)]
54        args: Vec<String>,
55    },
56
57    /// A channel callback-button tap. `callback_data` is `wf:<id>`
58    /// or `wf:<id>:<value>`; `<value>` becomes `input[arg]`.
59    Callback {
60        id: String,
61        #[serde(default, skip_serializing_if = "Option::is_none")]
62        arg: Option<String>,
63    },
64
65    /// The catch-all for non-command free text on a channel.
66    Message {},
67
68    /// Generic inbound HTTP from a 3rd-party service (not a channel
69    /// platform's bot webhook). Verified via [`WebhookAuth`].
70    Webhook {
71        /// URL path suffix mounted at `/v1/workflows/webhook/{path}`.
72        path: String,
73        /// Methods accepted. Empty = `POST` only.
74        #[serde(default)]
75        methods: Vec<String>,
76        #[serde(default)]
77        auth: WebhookAuth,
78        #[serde(default)]
79        response: WebhookResponse,
80    },
81
82    /// Internal event-bus subscription. The workflow starts (or
83    /// resumes a parked task waiting on this topic) when a matching
84    /// event is published.
85    Event {
86        topic: String,
87        #[serde(default, skip_serializing_if = "Option::is_none")]
88        filter: Option<serde_json::Value>,
89    },
90
91    /// The workflow exposed as an A2A skill on the agent card. An
92    /// external agent invokes it with `message/send`; the workflow's
93    /// final result is the tool result.
94    Tool {
95        name: String,
96        description: String,
97        #[serde(default, skip_serializing_if = "Option::is_none")]
98        input_schema: Option<serde_json::Value>,
99    },
100}
101
102/// Verification scheme for a generic webhook trigger. Reuses the
103/// connection model — no separate credential store.
104#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema, JsonSchema)]
105#[serde(tag = "type", rename_all = "snake_case")]
106pub enum WebhookAuth {
107    /// No verification (use only when the source is the public
108    /// internet AND the workflow side-effects are safe to trigger).
109    #[default]
110    None,
111    /// HMAC signature in a named header. Secret material comes from
112    /// the referenced connection's auth field.
113    HmacHeader {
114        /// Header name carrying the signature, e.g. `X-Hub-Signature-256`.
115        header: String,
116        /// Connection whose secret material verifies the header.
117        connection_id: uuid::Uuid,
118    },
119    /// Bearer token in `Authorization`. Token comes from the
120    /// referenced connection.
121    BearerToken { connection_id: uuid::Uuid },
122}
123
124/// How the webhook HTTP response is shaped.
125#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema, JsonSchema)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum WebhookResponse {
128    /// 202 returned immediately; the workflow runs async.
129    Ack,
130    /// Wait for the workflow to run a `RespondToTrigger` step and
131    /// return its body; times out after `timeout_secs`.
132    Sync {
133        #[serde(default, skip_serializing_if = "Option::is_none")]
134        timeout_secs: Option<u64>,
135    },
136}
137
138impl Default for WebhookResponse {
139    fn default() -> Self {
140        Self::Ack
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn slash_trigger_round_trips() {
150        let json = serde_json::json!({
151            "type": "slash",
152            "name": "/join",
153            "aliases": ["/continue"],
154            "channels": ["telegram"],
155            "args": ["code"]
156        });
157        let t: WorkflowTrigger = serde_json::from_value(json.clone()).unwrap();
158        assert!(matches!(t, WorkflowTrigger::Slash { .. }));
159        assert_eq!(serde_json::to_value(&t).unwrap(), json);
160    }
161
162    #[test]
163    fn manual_trigger_round_trips() {
164        let json = serde_json::json!({"type": "manual"});
165        let t: WorkflowTrigger = serde_json::from_value(json.clone()).unwrap();
166        assert!(matches!(t, WorkflowTrigger::Manual));
167        assert_eq!(serde_json::to_value(&t).unwrap(), json);
168    }
169
170    #[test]
171    fn webhook_trigger_round_trips() {
172        let wt = WorkflowTrigger::Webhook {
173            path: "github".into(),
174            methods: vec!["POST".into()],
175            auth: WebhookAuth::HmacHeader {
176                header: "X-Hub-Signature-256".into(),
177                connection_id: uuid::Uuid::new_v4(),
178            },
179            response: WebhookResponse::Ack,
180        };
181        let json = serde_json::to_value(&wt).unwrap();
182        let back: WorkflowTrigger = serde_json::from_value(json).unwrap();
183        assert_eq!(back, wt);
184    }
185
186    #[test]
187    fn tool_trigger_round_trips() {
188        let wt = WorkflowTrigger::Tool {
189            name: "summarize".into(),
190            description: "summarize a document".into(),
191            input_schema: Some(serde_json::json!({"type": "object"})),
192        };
193        let json = serde_json::to_value(&wt).unwrap();
194        let back: WorkflowTrigger = serde_json::from_value(json).unwrap();
195        assert_eq!(back, wt);
196    }
197
198    #[test]
199    fn event_trigger_round_trips() {
200        let wt = WorkflowTrigger::Event {
201            topic: "user.signup".into(),
202            filter: Some(serde_json::json!({"plan": "pro"})),
203        };
204        let json = serde_json::to_value(&wt).unwrap();
205        let back: WorkflowTrigger = serde_json::from_value(json).unwrap();
206        assert_eq!(back, wt);
207    }
208}