Skip to main content

distri_workflow/
trigger_registry.rs

1//! Routing index from declared workflow triggers back to the
2//! `(agent_id, entry_point_id)` that should be invoked.
3//!
4//! Built (or rebuilt) on orchestrator boot from every
5//! `WorkflowAgentDefinition` in the agent store, and updated when an
6//! agent upserts. The registry is what makes inbound stimuli —
7//! `/v1/workflows/webhook/{path}` requests, the scheduler tick,
8//! event-bus publishes, A2A `message/send` to a workflow-as-tool —
9//! resolve back to a concrete workflow run.
10//!
11//! Implementations: in-memory (tests + OSS server-cli); Redis-backed
12//! follows for the cloud (so multiple cloud processes see the same
13//! routing without each having to rebuild on boot).
14
15use crate::types::WorkflowDefinition;
16use distri_types::WorkflowTrigger;
17use std::collections::HashMap;
18
19/// What a registry hit resolves to.
20///
21/// `workspace_id` is the tenant the agent belongs to (cloud); `None`
22/// for OSS / single-tenant deployments. The webhook / scheduler /
23/// event dispatchers use it to set the task-local workspace context
24/// before spawning the run.
25#[derive(Debug, Clone, PartialEq)]
26pub struct TriggerBinding {
27    pub agent_id: String,
28    pub workspace_id: Option<String>,
29    pub entry_point_id: String,
30    pub trigger: WorkflowTrigger,
31}
32
33/// Persist and query the trigger -> (agent_id, entry_point_id) routing.
34#[async_trait::async_trait]
35pub trait WorkflowTriggerRegistry: Send + Sync {
36    /// Register all triggers from an agent's workflow definition.
37    /// Overwrites any previous bindings for this agent (call after
38    /// upsert). `workspace_id` is the tenant the agent belongs to
39    /// (cloud); `None` for OSS.
40    async fn register(
41        &self,
42        agent_id: &str,
43        workspace_id: Option<&str>,
44        def: &WorkflowDefinition,
45    ) -> anyhow::Result<()>;
46
47    /// Remove all bindings for an agent.
48    async fn unregister(&self, agent_id: &str) -> anyhow::Result<()>;
49
50    /// Resolve the binding for a `Webhook { path }` trigger. The
51    /// webhook HTTP route maps `/v1/workflows/webhook/{path}` here.
52    /// First-match wins when multiple tenants declare the same path.
53    async fn find_webhook(&self, path: &str) -> anyhow::Result<Option<TriggerBinding>>;
54
55    /// Resolve the binding for a `Tool { name }` trigger (workflow
56    /// exposed as an A2A skill).
57    async fn find_tool(&self, tool_name: &str) -> anyhow::Result<Option<TriggerBinding>>;
58
59    /// All bindings for an `Event { topic }` trigger. Returns every
60    /// subscriber so the event bus can fan-out.
61    async fn find_event(&self, topic: &str) -> anyhow::Result<Vec<TriggerBinding>>;
62
63    /// All bindings that are `Schedule { … }` triggers. The
64    /// scheduler tick walks this list each interval and fires due
65    /// runs.
66    async fn list_schedules(&self) -> anyhow::Result<Vec<TriggerBinding>>;
67}
68
69/// In-memory registry. One `HashMap<agent_id, Vec<TriggerBinding>>`
70/// plus secondary indices for the hot lookups. The Redis impl
71/// follows a similar layout but keys per trigger kind.
72#[derive(Default)]
73pub struct InMemoryWorkflowTriggerRegistry {
74    bindings: std::sync::Mutex<HashMap<String, Vec<TriggerBinding>>>,
75}
76
77impl InMemoryWorkflowTriggerRegistry {
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    fn collect_bindings(
83        agent_id: &str,
84        workspace_id: Option<&str>,
85        def: &WorkflowDefinition,
86    ) -> Vec<TriggerBinding> {
87        let mut out = Vec::new();
88        for ep in &def.entry_points {
89            for trigger in &ep.triggers {
90                out.push(TriggerBinding {
91                    agent_id: agent_id.to_string(),
92                    workspace_id: workspace_id.map(|s| s.to_string()),
93                    entry_point_id: ep.id.clone(),
94                    trigger: trigger.clone(),
95                });
96            }
97        }
98        out
99    }
100}
101
102#[async_trait::async_trait]
103impl WorkflowTriggerRegistry for InMemoryWorkflowTriggerRegistry {
104    async fn register(
105        &self,
106        agent_id: &str,
107        workspace_id: Option<&str>,
108        def: &WorkflowDefinition,
109    ) -> anyhow::Result<()> {
110        let mut guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
111        guard.insert(
112            agent_id.to_string(),
113            Self::collect_bindings(agent_id, workspace_id, def),
114        );
115        Ok(())
116    }
117
118    async fn unregister(&self, agent_id: &str) -> anyhow::Result<()> {
119        let mut guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
120        guard.remove(agent_id);
121        Ok(())
122    }
123
124    async fn find_webhook(&self, path: &str) -> anyhow::Result<Option<TriggerBinding>> {
125        let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
126        for entries in guard.values() {
127            for binding in entries {
128                if let WorkflowTrigger::Webhook { path: p, .. } = &binding.trigger {
129                    if p == path {
130                        return Ok(Some(binding.clone()));
131                    }
132                }
133            }
134        }
135        Ok(None)
136    }
137
138    async fn find_tool(&self, tool_name: &str) -> anyhow::Result<Option<TriggerBinding>> {
139        let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
140        for entries in guard.values() {
141            for binding in entries {
142                if let WorkflowTrigger::Tool { name, .. } = &binding.trigger {
143                    if name == tool_name {
144                        return Ok(Some(binding.clone()));
145                    }
146                }
147            }
148        }
149        Ok(None)
150    }
151
152    async fn find_event(&self, topic: &str) -> anyhow::Result<Vec<TriggerBinding>> {
153        let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
154        let mut out = Vec::new();
155        for entries in guard.values() {
156            for binding in entries {
157                if let WorkflowTrigger::Event { topic: t, .. } = &binding.trigger {
158                    if t == topic {
159                        out.push(binding.clone());
160                    }
161                }
162            }
163        }
164        Ok(out)
165    }
166
167    async fn list_schedules(&self) -> anyhow::Result<Vec<TriggerBinding>> {
168        let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
169        let mut out = Vec::new();
170        for entries in guard.values() {
171            for binding in entries {
172                if matches!(&binding.trigger, WorkflowTrigger::Schedule { .. }) {
173                    out.push(binding.clone());
174                }
175            }
176        }
177        Ok(out)
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::types::{EntryPoint, WorkflowDefinition, WorkflowStep};
185    use distri_types::workflow_triggers::WebhookAuth;
186
187    fn def_with(triggers: Vec<WorkflowTrigger>) -> WorkflowDefinition {
188        WorkflowDefinition::new(vec![WorkflowStep::checkpoint("s", "S", "ok")])
189            .with_entry_points(vec![EntryPoint {
190                id: "main".into(),
191                label: "Main".into(),
192                description: None,
193                starts_at: "s".into(),
194                preset_results: Default::default(),
195                required_inputs: vec![],
196                triggers,
197            }])
198    }
199
200    #[tokio::test]
201    async fn register_then_find_webhook() {
202        let reg = InMemoryWorkflowTriggerRegistry::new();
203        let def = def_with(vec![WorkflowTrigger::Webhook {
204            path: "github".into(),
205            methods: vec!["POST".into()],
206            auth: WebhookAuth::None,
207            response: Default::default(),
208        }]);
209        reg.register("agent-1", None, &def).await.unwrap();
210
211        let hit = reg.find_webhook("github").await.unwrap().unwrap();
212        assert_eq!(hit.agent_id, "agent-1");
213        assert_eq!(hit.entry_point_id, "main");
214
215        assert!(reg.find_webhook("missing").await.unwrap().is_none());
216    }
217
218    #[tokio::test]
219    async fn register_then_find_tool() {
220        let reg = InMemoryWorkflowTriggerRegistry::new();
221        let def = def_with(vec![WorkflowTrigger::Tool {
222            name: "summarize".into(),
223            description: "summarize a document".into(),
224            input_schema: None,
225        }]);
226        reg.register("wf-summarize", None, &def).await.unwrap();
227
228        let hit = reg.find_tool("summarize").await.unwrap().unwrap();
229        assert_eq!(hit.agent_id, "wf-summarize");
230
231        assert!(reg.find_tool("nope").await.unwrap().is_none());
232    }
233
234    #[tokio::test]
235    async fn find_event_fans_out() {
236        let reg = InMemoryWorkflowTriggerRegistry::new();
237        let def_a = def_with(vec![WorkflowTrigger::Event {
238            topic: "user.signup".into(),
239            filter: None,
240        }]);
241        let def_b = def_with(vec![WorkflowTrigger::Event {
242            topic: "user.signup".into(),
243            filter: None,
244        }]);
245        reg.register("agent-a", None, &def_a).await.unwrap();
246        reg.register("agent-b", None, &def_b).await.unwrap();
247
248        let hits = reg.find_event("user.signup").await.unwrap();
249        assert_eq!(hits.len(), 2);
250    }
251
252    #[tokio::test]
253    async fn list_schedules_returns_only_schedule_triggers() {
254        let reg = InMemoryWorkflowTriggerRegistry::new();
255        let def = def_with(vec![
256            WorkflowTrigger::Schedule {
257                cron: "0 * * * *".into(),
258                timezone: None,
259                enabled: true,
260                input: None,
261            },
262            WorkflowTrigger::Manual,
263        ]);
264        reg.register("nightly", None, &def).await.unwrap();
265
266        let sched = reg.list_schedules().await.unwrap();
267        assert_eq!(sched.len(), 1);
268        assert!(matches!(sched[0].trigger, WorkflowTrigger::Schedule { .. }));
269    }
270
271    #[tokio::test]
272    async fn unregister_clears_bindings() {
273        let reg = InMemoryWorkflowTriggerRegistry::new();
274        let def = def_with(vec![WorkflowTrigger::Webhook {
275            path: "stripe".into(),
276            methods: vec![],
277            auth: WebhookAuth::None,
278            response: Default::default(),
279        }]);
280        reg.register("billing", None, &def).await.unwrap();
281        assert!(reg.find_webhook("stripe").await.unwrap().is_some());
282
283        reg.unregister("billing").await.unwrap();
284        assert!(reg.find_webhook("stripe").await.unwrap().is_none());
285    }
286
287    #[tokio::test]
288    async fn register_overwrites_previous_bindings_for_agent() {
289        let reg = InMemoryWorkflowTriggerRegistry::new();
290        let def_v1 = def_with(vec![WorkflowTrigger::Webhook {
291            path: "v1".into(),
292            methods: vec![],
293            auth: WebhookAuth::None,
294            response: Default::default(),
295        }]);
296        reg.register("api", None, &def_v1).await.unwrap();
297        assert!(reg.find_webhook("v1").await.unwrap().is_some());
298
299        // Re-register with a different path — v1 should disappear.
300        let def_v2 = def_with(vec![WorkflowTrigger::Webhook {
301            path: "v2".into(),
302            methods: vec![],
303            auth: WebhookAuth::None,
304            response: Default::default(),
305        }]);
306        reg.register("api", None, &def_v2).await.unwrap();
307        assert!(reg.find_webhook("v1").await.unwrap().is_none());
308        assert!(reg.find_webhook("v2").await.unwrap().is_some());
309    }
310}