1use crate::types::WorkflowDefinition;
16use distri_types::WorkflowTrigger;
17use std::collections::HashMap;
18
19#[derive(Debug, Clone, PartialEq)]
26pub struct TriggerBinding {
27 pub agent_id: String,
28 pub workspace_id: Option<String>,
29 pub entry_point_id: String,
30 pub trigger: WorkflowTrigger,
31}
32
33#[async_trait::async_trait]
35pub trait WorkflowTriggerRegistry: Send + Sync {
36 async fn register(
41 &self,
42 agent_id: &str,
43 workspace_id: Option<&str>,
44 def: &WorkflowDefinition,
45 ) -> anyhow::Result<()>;
46
47 async fn unregister(&self, agent_id: &str) -> anyhow::Result<()>;
49
50 async fn find_webhook(&self, path: &str) -> anyhow::Result<Option<TriggerBinding>>;
54
55 async fn find_tool(&self, tool_name: &str) -> anyhow::Result<Option<TriggerBinding>>;
58
59 async fn find_event(&self, topic: &str) -> anyhow::Result<Vec<TriggerBinding>>;
62
63 async fn list_schedules(&self) -> anyhow::Result<Vec<TriggerBinding>>;
67}
68
69#[derive(Default)]
73pub struct InMemoryWorkflowTriggerRegistry {
74 bindings: std::sync::Mutex<HashMap<String, Vec<TriggerBinding>>>,
75}
76
77impl InMemoryWorkflowTriggerRegistry {
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 fn collect_bindings(
83 agent_id: &str,
84 workspace_id: Option<&str>,
85 def: &WorkflowDefinition,
86 ) -> Vec<TriggerBinding> {
87 let mut out = Vec::new();
88 for ep in &def.entry_points {
89 for trigger in &ep.triggers {
90 out.push(TriggerBinding {
91 agent_id: agent_id.to_string(),
92 workspace_id: workspace_id.map(|s| s.to_string()),
93 entry_point_id: ep.id.clone(),
94 trigger: trigger.clone(),
95 });
96 }
97 }
98 out
99 }
100}
101
102#[async_trait::async_trait]
103impl WorkflowTriggerRegistry for InMemoryWorkflowTriggerRegistry {
104 async fn register(
105 &self,
106 agent_id: &str,
107 workspace_id: Option<&str>,
108 def: &WorkflowDefinition,
109 ) -> anyhow::Result<()> {
110 let mut guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
111 guard.insert(
112 agent_id.to_string(),
113 Self::collect_bindings(agent_id, workspace_id, def),
114 );
115 Ok(())
116 }
117
118 async fn unregister(&self, agent_id: &str) -> anyhow::Result<()> {
119 let mut guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
120 guard.remove(agent_id);
121 Ok(())
122 }
123
124 async fn find_webhook(&self, path: &str) -> anyhow::Result<Option<TriggerBinding>> {
125 let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
126 for entries in guard.values() {
127 for binding in entries {
128 if let WorkflowTrigger::Webhook { path: p, .. } = &binding.trigger {
129 if p == path {
130 return Ok(Some(binding.clone()));
131 }
132 }
133 }
134 }
135 Ok(None)
136 }
137
138 async fn find_tool(&self, tool_name: &str) -> anyhow::Result<Option<TriggerBinding>> {
139 let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
140 for entries in guard.values() {
141 for binding in entries {
142 if let WorkflowTrigger::Tool { name, .. } = &binding.trigger {
143 if name == tool_name {
144 return Ok(Some(binding.clone()));
145 }
146 }
147 }
148 }
149 Ok(None)
150 }
151
152 async fn find_event(&self, topic: &str) -> anyhow::Result<Vec<TriggerBinding>> {
153 let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
154 let mut out = Vec::new();
155 for entries in guard.values() {
156 for binding in entries {
157 if let WorkflowTrigger::Event { topic: t, .. } = &binding.trigger {
158 if t == topic {
159 out.push(binding.clone());
160 }
161 }
162 }
163 }
164 Ok(out)
165 }
166
167 async fn list_schedules(&self) -> anyhow::Result<Vec<TriggerBinding>> {
168 let guard = self.bindings.lock().map_err(|e| anyhow::anyhow!(e.to_string()))?;
169 let mut out = Vec::new();
170 for entries in guard.values() {
171 for binding in entries {
172 if matches!(&binding.trigger, WorkflowTrigger::Schedule { .. }) {
173 out.push(binding.clone());
174 }
175 }
176 }
177 Ok(out)
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::types::{EntryPoint, WorkflowDefinition, WorkflowStep};
185 use distri_types::workflow_triggers::WebhookAuth;
186
187 fn def_with(triggers: Vec<WorkflowTrigger>) -> WorkflowDefinition {
188 WorkflowDefinition::new(vec![WorkflowStep::checkpoint("s", "S", "ok")])
189 .with_entry_points(vec![EntryPoint {
190 id: "main".into(),
191 label: "Main".into(),
192 description: None,
193 starts_at: "s".into(),
194 preset_results: Default::default(),
195 required_inputs: vec![],
196 triggers,
197 }])
198 }
199
200 #[tokio::test]
201 async fn register_then_find_webhook() {
202 let reg = InMemoryWorkflowTriggerRegistry::new();
203 let def = def_with(vec![WorkflowTrigger::Webhook {
204 path: "github".into(),
205 methods: vec!["POST".into()],
206 auth: WebhookAuth::None,
207 response: Default::default(),
208 }]);
209 reg.register("agent-1", None, &def).await.unwrap();
210
211 let hit = reg.find_webhook("github").await.unwrap().unwrap();
212 assert_eq!(hit.agent_id, "agent-1");
213 assert_eq!(hit.entry_point_id, "main");
214
215 assert!(reg.find_webhook("missing").await.unwrap().is_none());
216 }
217
218 #[tokio::test]
219 async fn register_then_find_tool() {
220 let reg = InMemoryWorkflowTriggerRegistry::new();
221 let def = def_with(vec![WorkflowTrigger::Tool {
222 name: "summarize".into(),
223 description: "summarize a document".into(),
224 input_schema: None,
225 }]);
226 reg.register("wf-summarize", None, &def).await.unwrap();
227
228 let hit = reg.find_tool("summarize").await.unwrap().unwrap();
229 assert_eq!(hit.agent_id, "wf-summarize");
230
231 assert!(reg.find_tool("nope").await.unwrap().is_none());
232 }
233
234 #[tokio::test]
235 async fn find_event_fans_out() {
236 let reg = InMemoryWorkflowTriggerRegistry::new();
237 let def_a = def_with(vec![WorkflowTrigger::Event {
238 topic: "user.signup".into(),
239 filter: None,
240 }]);
241 let def_b = def_with(vec![WorkflowTrigger::Event {
242 topic: "user.signup".into(),
243 filter: None,
244 }]);
245 reg.register("agent-a", None, &def_a).await.unwrap();
246 reg.register("agent-b", None, &def_b).await.unwrap();
247
248 let hits = reg.find_event("user.signup").await.unwrap();
249 assert_eq!(hits.len(), 2);
250 }
251
252 #[tokio::test]
253 async fn list_schedules_returns_only_schedule_triggers() {
254 let reg = InMemoryWorkflowTriggerRegistry::new();
255 let def = def_with(vec![
256 WorkflowTrigger::Schedule {
257 cron: "0 * * * *".into(),
258 timezone: None,
259 enabled: true,
260 input: None,
261 },
262 WorkflowTrigger::Manual,
263 ]);
264 reg.register("nightly", None, &def).await.unwrap();
265
266 let sched = reg.list_schedules().await.unwrap();
267 assert_eq!(sched.len(), 1);
268 assert!(matches!(sched[0].trigger, WorkflowTrigger::Schedule { .. }));
269 }
270
271 #[tokio::test]
272 async fn unregister_clears_bindings() {
273 let reg = InMemoryWorkflowTriggerRegistry::new();
274 let def = def_with(vec![WorkflowTrigger::Webhook {
275 path: "stripe".into(),
276 methods: vec![],
277 auth: WebhookAuth::None,
278 response: Default::default(),
279 }]);
280 reg.register("billing", None, &def).await.unwrap();
281 assert!(reg.find_webhook("stripe").await.unwrap().is_some());
282
283 reg.unregister("billing").await.unwrap();
284 assert!(reg.find_webhook("stripe").await.unwrap().is_none());
285 }
286
287 #[tokio::test]
288 async fn register_overwrites_previous_bindings_for_agent() {
289 let reg = InMemoryWorkflowTriggerRegistry::new();
290 let def_v1 = def_with(vec![WorkflowTrigger::Webhook {
291 path: "v1".into(),
292 methods: vec![],
293 auth: WebhookAuth::None,
294 response: Default::default(),
295 }]);
296 reg.register("api", None, &def_v1).await.unwrap();
297 assert!(reg.find_webhook("v1").await.unwrap().is_some());
298
299 let def_v2 = def_with(vec![WorkflowTrigger::Webhook {
301 path: "v2".into(),
302 methods: vec![],
303 auth: WebhookAuth::None,
304 response: Default::default(),
305 }]);
306 reg.register("api", None, &def_v2).await.unwrap();
307 assert!(reg.find_webhook("v1").await.unwrap().is_none());
308 assert!(reg.find_webhook("v2").await.unwrap().is_some());
309 }
310}