Skip to main content

worldinterface_http_trigger/
registry.rs

1//! In-memory webhook registry backed by ContextStore for durability.
2
3use std::collections::HashMap;
4
5use worldinterface_contextstore::ContextStore;
6
7use crate::error::WebhookError;
8use crate::registration::WebhookRegistration;
9use crate::types::WebhookId;
10
11/// Global key in ContextStore for persisting webhook registrations.
12const WEBHOOKS_GLOBAL_KEY: &str = "webhook_registrations";
13
14/// In-memory registry of webhook registrations, backed by ContextStore for durability.
15pub struct WebhookRegistry {
16    /// Path -> Registration mapping for O(1) lookup on invocation.
17    by_path: HashMap<String, WebhookRegistration>,
18    /// ID -> Path mapping for O(1) lookup on deletion.
19    by_id: HashMap<WebhookId, String>,
20}
21
22impl WebhookRegistry {
23    /// Create a new empty registry.
24    pub fn new() -> Self {
25        Self { by_path: HashMap::new(), by_id: HashMap::new() }
26    }
27
28    /// Load all webhook registrations from ContextStore.
29    ///
30    /// Called at daemon startup to restore registrations from the previous session.
31    pub fn load_from_store(store: &dyn ContextStore) -> Result<Self, WebhookError> {
32        let mut registry = Self::new();
33        if let Some(value) = store.get_global(WEBHOOKS_GLOBAL_KEY)? {
34            let registrations: Vec<WebhookRegistration> = serde_json::from_value(value)?;
35            for reg in registrations {
36                registry.by_id.insert(reg.id, reg.path.clone());
37                registry.by_path.insert(reg.path.clone(), reg);
38            }
39        }
40        Ok(registry)
41    }
42
43    /// Persist all registrations to ContextStore.
44    fn persist(&self, store: &dyn ContextStore) -> Result<(), WebhookError> {
45        let registrations: Vec<&WebhookRegistration> = self.by_path.values().collect();
46        let value = serde_json::to_value(registrations)?;
47        store.upsert_global(WEBHOOKS_GLOBAL_KEY, &value)?;
48        Ok(())
49    }
50
51    /// Register a new webhook. Returns error if the path is already registered.
52    pub fn register(
53        &mut self,
54        registration: WebhookRegistration,
55        store: &dyn ContextStore,
56    ) -> Result<(), WebhookError> {
57        if self.by_path.contains_key(&registration.path) {
58            return Err(WebhookError::PathAlreadyRegistered(registration.path.clone()));
59        }
60        self.by_id.insert(registration.id, registration.path.clone());
61        self.by_path.insert(registration.path.clone(), registration);
62        self.persist(store)?;
63        Ok(())
64    }
65
66    /// Remove a webhook by ID. Returns the removed registration, or error if not found.
67    pub fn remove(
68        &mut self,
69        id: WebhookId,
70        store: &dyn ContextStore,
71    ) -> Result<WebhookRegistration, WebhookError> {
72        let path = self.by_id.remove(&id).ok_or(WebhookError::WebhookNotFound(id))?;
73        let registration = self.by_path.remove(&path).expect("inconsistent registry");
74        self.persist(store)?;
75        Ok(registration)
76    }
77
78    /// Look up a webhook by path. Used by the catch-all webhook handler.
79    pub fn get_by_path(&self, path: &str) -> Option<&WebhookRegistration> {
80        self.by_path.get(path)
81    }
82
83    /// Look up the path for a webhook ID.
84    pub fn by_id_to_path(&self, id: &WebhookId) -> Option<&str> {
85        self.by_id.get(id).map(|s| s.as_str())
86    }
87
88    /// List all registered webhooks, sorted by creation time.
89    pub fn list(&self) -> Vec<&WebhookRegistration> {
90        let mut registrations: Vec<_> = self.by_path.values().collect();
91        registrations.sort_by_key(|r| r.created_at);
92        registrations
93    }
94
95    /// Number of registered webhooks.
96    pub fn len(&self) -> usize {
97        self.by_path.len()
98    }
99
100    /// Whether the registry is empty.
101    pub fn is_empty(&self) -> bool {
102        self.by_path.is_empty()
103    }
104}
105
106impl Default for WebhookRegistry {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use serde_json::json;
115    use worldinterface_contextstore::SqliteContextStore;
116    use worldinterface_core::flowspec::*;
117    use worldinterface_core::id::NodeId;
118
119    use super::*;
120
121    fn make_store() -> SqliteContextStore {
122        SqliteContextStore::in_memory().unwrap()
123    }
124
125    fn make_registration(path: &str, created_at: u64) -> WebhookRegistration {
126        let node_id = NodeId::new();
127        WebhookRegistration {
128            id: WebhookId::new(),
129            path: path.to_string(),
130            flow_spec: FlowSpec {
131                id: None,
132                name: None,
133                nodes: vec![Node {
134                    id: node_id,
135                    label: None,
136                    node_type: NodeType::Connector(ConnectorNode {
137                        connector: "delay".into(),
138                        params: json!({"duration_ms": 10}),
139                        idempotency_config: None,
140                    }),
141                }],
142                edges: vec![],
143                params: None,
144            },
145            description: None,
146            created_at,
147        }
148    }
149
150    #[test]
151    fn register_and_lookup() {
152        let store = make_store();
153        let mut registry = WebhookRegistry::new();
154        let reg = make_registration("github/push", 1000);
155        let id = reg.id;
156        registry.register(reg, &store).unwrap();
157
158        let found = registry.get_by_path("github/push").unwrap();
159        assert_eq!(found.id, id);
160        assert_eq!(found.path, "github/push");
161    }
162
163    #[test]
164    fn register_duplicate_path_rejected() {
165        let store = make_store();
166        let mut registry = WebhookRegistry::new();
167        let reg1 = make_registration("github/push", 1000);
168        registry.register(reg1, &store).unwrap();
169
170        let reg2 = make_registration("github/push", 2000);
171        let err = registry.register(reg2, &store).unwrap_err();
172        assert!(matches!(err, WebhookError::PathAlreadyRegistered(_)));
173    }
174
175    #[test]
176    fn remove_by_id() {
177        let store = make_store();
178        let mut registry = WebhookRegistry::new();
179        let reg = make_registration("github/push", 1000);
180        let id = reg.id;
181        registry.register(reg, &store).unwrap();
182
183        let removed = registry.remove(id, &store).unwrap();
184        assert_eq!(removed.path, "github/push");
185        assert!(registry.get_by_path("github/push").is_none());
186    }
187
188    #[test]
189    fn remove_nonexistent_returns_error() {
190        let store = make_store();
191        let mut registry = WebhookRegistry::new();
192        let err = registry.remove(WebhookId::new(), &store).unwrap_err();
193        assert!(matches!(err, WebhookError::WebhookNotFound(_)));
194    }
195
196    #[test]
197    fn list_returns_all_sorted_by_created_at() {
198        let store = make_store();
199        let mut registry = WebhookRegistry::new();
200        registry.register(make_registration("c-path", 3000), &store).unwrap();
201        registry.register(make_registration("a-path", 1000), &store).unwrap();
202        registry.register(make_registration("b-path", 2000), &store).unwrap();
203
204        let list = registry.list();
205        assert_eq!(list.len(), 3);
206        assert_eq!(list[0].path, "a-path");
207        assert_eq!(list[1].path, "b-path");
208        assert_eq!(list[2].path, "c-path");
209    }
210
211    #[test]
212    fn len_tracks_count() {
213        let store = make_store();
214        let mut registry = WebhookRegistry::new();
215        let reg1 = make_registration("path1", 1000);
216        let reg2 = make_registration("path2", 2000);
217        let id1 = reg1.id;
218        registry.register(reg1, &store).unwrap();
219        registry.register(reg2, &store).unwrap();
220        assert_eq!(registry.len(), 2);
221
222        registry.remove(id1, &store).unwrap();
223        assert_eq!(registry.len(), 1);
224    }
225
226    // T-4: Persistence tests
227
228    #[test]
229    fn persist_and_load_roundtrip() {
230        let store = make_store();
231        let mut registry = WebhookRegistry::new();
232        registry.register(make_registration("path1", 1000), &store).unwrap();
233        registry.register(make_registration("path2", 2000), &store).unwrap();
234
235        // Load into a new registry from the same store
236        let loaded = WebhookRegistry::load_from_store(&store).unwrap();
237        assert_eq!(loaded.len(), 2);
238        assert!(loaded.get_by_path("path1").is_some());
239        assert!(loaded.get_by_path("path2").is_some());
240    }
241
242    #[test]
243    fn load_from_empty_store() {
244        let store = make_store();
245        let loaded = WebhookRegistry::load_from_store(&store).unwrap();
246        assert!(loaded.is_empty());
247    }
248
249    #[test]
250    fn persist_after_remove() {
251        let store = make_store();
252        let mut registry = WebhookRegistry::new();
253        let reg1 = make_registration("path1", 1000);
254        let id1 = reg1.id;
255        registry.register(reg1, &store).unwrap();
256        registry.register(make_registration("path2", 2000), &store).unwrap();
257
258        registry.remove(id1, &store).unwrap();
259
260        let loaded = WebhookRegistry::load_from_store(&store).unwrap();
261        assert_eq!(loaded.len(), 1);
262        assert!(loaded.get_by_path("path1").is_none());
263        assert!(loaded.get_by_path("path2").is_some());
264    }
265}