Skip to main content

obeli_sk_wasm_workers/
registry.rs

1//! Component registry for a single deployment.
2//!
3use crate::workflow::workflow_js_worker::WorkflowJsWorker;
4use crate::workflow::workflow_worker::WorkflowWorker;
5use concepts::ComponentId;
6use concepts::ComponentType;
7use concepts::FunctionFqn;
8use concepts::FunctionMetadata;
9use concepts::FunctionRegistry;
10use concepts::IfcFqnName;
11use concepts::PackageIfcFns;
12use concepts::StrVariant;
13use concepts::component_id::ComponentDigest;
14use hashbrown::HashMap;
15use indexmap::IndexMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::Arc;
19use tracing::error;
20
21/// Origin of a component's WIT: parsed from a real WASM binary, or synthesized from `TypeWrapper`s.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display)]
23pub enum WitOrigin {
24    #[display("wasm")]
25    Wasm,
26    #[display("synthesized")]
27    Synthesized,
28}
29
30/// Holds information about components, used for gRPC services like `ListComponents`
31#[derive(Debug, Clone)]
32pub struct ComponentConfig {
33    pub component_id: ComponentId,
34    pub imports: Vec<FunctionMetadata>,
35    pub workflow_or_activity_config: Option<ComponentConfigImportable>,
36    pub wit: String,
37    /// Origin of this component's WIT (parsed from WASM vs synthesized from `TypeWrapper`s).
38    pub wit_origin: WitOrigin,
39}
40
41/// A replay-purposed worker held by the server so gRPC and REST handlers can replay/advance
42/// executions without re-compiling the WASM component on every request.
43#[derive(Clone)]
44pub enum ReplayWorker {
45    Wasm(Arc<WorkflowWorker>),
46    Js(Arc<WorkflowJsWorker>),
47}
48
49impl Debug for ReplayWorker {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            ReplayWorker::Wasm(_) => f.write_str("ReplayWorker::Wasm(..)"),
53            ReplayWorker::Js(_) => f.write_str("ReplayWorker::Js(..)"),
54        }
55    }
56}
57
58/// Per-deployment registry of replay-purposed workflow workers, indexed by component digest.
59#[derive(Default, Debug, Clone)]
60pub struct ReplayWorkerRegistry {
61    workers: HashMap<ComponentDigest, (ComponentId, ReplayWorker)>,
62}
63
64impl ReplayWorkerRegistry {
65    pub fn insert(&mut self, component_id: ComponentId, worker: ReplayWorker) {
66        let digest = component_id.component_digest.clone();
67        let old = self.workers.insert(digest, (component_id, worker));
68        assert!(
69            old.is_none(),
70            "replay worker already registered for this digest"
71        );
72    }
73
74    #[must_use]
75    pub fn get(&self, digest: &ComponentDigest) -> Option<(&ComponentId, &ReplayWorker)> {
76        self.workers.get(digest).map(|(id, w)| (id, w))
77    }
78}
79
80#[derive(Debug, Clone)]
81// Workflows or Activities (WASM, stub, external), but not Webhooks
82pub struct ComponentConfigImportable {
83    pub exports_ext: Vec<FunctionMetadata>,
84    pub exports_hierarchy_ext: Vec<PackageIfcFns>,
85}
86
87#[derive(Default, Debug)]
88pub struct ComponentConfigRegistry {
89    inner: ComponentConfigRegistryInner,
90}
91
92#[derive(Default, Debug)]
93struct ComponentConfigRegistryInner {
94    exported_ffqns_ext: IndexMap<FunctionFqn, (ComponentId, FunctionMetadata)>,
95    export_hierarchy: Vec<PackageIfcFns>,
96    /// Tracks the origin (synthesized-WIT vs real WASM) of each exported non-extension
97    /// interface so that JS / inline-stub interfaces cannot be merged into WASM-owned
98    /// interfaces and vice versa.
99    export_hierarchy_origin: HashMap<IfcFqnName, WitOrigin>,
100    /// Primary index: component name → component config. Names are unique across all component types.
101    names_to_components: IndexMap<StrVariant, ComponentConfig>,
102    /// Digest-keyed secondary indexes.
103    digests_to_wit: IndexMap<ComponentDigest, String>,
104}
105
106#[derive(Debug, Clone, thiserror::Error)]
107#[error("registering component failed: {0}")]
108pub struct ComponentInsertionError(StrVariant);
109
110impl ComponentConfigRegistry {
111    pub fn insert(&mut self, component: ComponentConfig) -> Result<(), ComponentInsertionError> {
112        let name = &component.component_id.name;
113        // verify that the component is not already present by name
114        if self.inner.names_to_components.contains_key(name) {
115            return Err(ComponentInsertionError(
116                format!("component with name `{name}` is already registered").into(),
117            ));
118        }
119
120        // component.workflow_or_activity_config == None implies webhook.
121        // Webhooks do not have to have a unique component digest - all share the same ffqn.
122        // The same webhook source can be configured differently.
123        // Webhooks need just to provide source and WIT, so duplication is OK.
124
125        if let Some(workflow_or_activity_config) = &component.workflow_or_activity_config {
126            if self
127                .inner
128                .digests_to_wit
129                .contains_key(&component.component_id.component_digest)
130            {
131                return Err(ComponentInsertionError(
132                    format!(
133                        "component {} is already inserted with the same digest",
134                        component.component_id
135                    )
136                    .into(),
137                ));
138            }
139
140            for exported_ffqn in workflow_or_activity_config
141                .exports_ext
142                .iter()
143                .map(|f| &f.ffqn)
144            {
145                if let Some((conflicting_id, _)) = self.inner.exported_ffqns_ext.get(exported_ffqn)
146                {
147                    return Err(ComponentInsertionError(
148                        format!(
149                        "function {exported_ffqn} is already exported by component {conflicting_id}, cannot insert {}",
150                        component.component_id
151                    ).into()));
152                }
153            }
154            // insert to `exported_ffqns_ext`
155            for exported_fn_metadata in &workflow_or_activity_config.exports_ext {
156                let old = self.inner.exported_ffqns_ext.insert(
157                    exported_fn_metadata.ffqn.clone(),
158                    (component.component_id.clone(), exported_fn_metadata.clone()),
159                );
160                assert!(old.is_none());
161            }
162            // Insert into `export_hierarchy`, merging entries that share the same ifc_fqn.
163            for new_ifc_fns in &workflow_or_activity_config.exports_hierarchy_ext {
164                if !new_ifc_fns.extension {
165                    if let Some(&existing_origin) =
166                        self.inner.export_hierarchy_origin.get(&new_ifc_fns.ifc_fqn)
167                    {
168                        if existing_origin != component.wit_origin {
169                            return Err(ComponentInsertionError(
170                                format!(
171                                    "interface `{}` is already exported by a {} component, cannot insert {} which is a {} component",
172                                    new_ifc_fns.ifc_fqn,
173                                    existing_origin,
174                                    component.component_id,
175                                    component.wit_origin,
176                                )
177                                .into(),
178                            ));
179                        }
180                    } else {
181                        self.inner
182                            .export_hierarchy_origin
183                            .insert(new_ifc_fns.ifc_fqn.clone(), component.wit_origin);
184                    }
185                }
186                if let Some(existing) = self.inner.export_hierarchy.iter_mut().find(|e| {
187                    e.ifc_fqn == new_ifc_fns.ifc_fqn && e.extension == new_ifc_fns.extension
188                }) {
189                    existing.fns.extend(new_ifc_fns.fns.clone());
190                } else {
191                    self.inner.export_hierarchy.push(new_ifc_fns.clone());
192                }
193            }
194
195            // Insert into `digests_to_wit`
196            let old = self.inner.digests_to_wit.insert(
197                component.component_id.component_digest.clone(),
198                component.wit.clone(),
199            );
200            assert!(old.is_none());
201        } else if component.component_id.component_type == ComponentType::WebhookEndpoint {
202            // first wins for digest-keyed maps (same code = same WIT)
203            self.inner
204                .digests_to_wit
205                .entry(component.component_id.component_digest.clone())
206                .or_insert(component.wit.clone());
207        } // Cron executions do not expose WIT
208
209        let old = self
210            .inner
211            .names_to_components
212            .insert(name.clone(), component);
213        assert!(old.is_none());
214
215        Ok(())
216    }
217
218    /// Verify that each imported function can be matched by looking at the available exports.
219    /// This is a best effort to give function-level error messages.
220    /// WASI imports and host functions are not validated at the moment, those errors
221    /// are caught by wasmtime while pre-instantiation with a message containing the missing interface.
222    pub fn verify_registry(
223        self,
224    ) -> (
225        ComponentConfigRegistryRO,
226        Option<String>, /* supressed_errors */
227    ) {
228        let mut errors = Vec::new();
229        for examined_component in self.inner.names_to_components.values() {
230            self.verify_imports_component(examined_component, &mut errors);
231        }
232        let errors = if !errors.is_empty() {
233            let errors = errors.join("\n");
234            tracing::warn!("component resolution error: \n{errors}");
235            Some(errors)
236        } else {
237            None
238        };
239        (
240            ComponentConfigRegistryRO {
241                inner: Arc::new(self.inner),
242            },
243            errors,
244        )
245    }
246
247    fn additional_import_allowlist(
248        import: &FunctionMetadata,
249        component_type: ComponentType,
250    ) -> bool {
251        match component_type {
252            ComponentType::Activity => {
253                // wasi + log
254                match import.ffqn.ifc_fqn.namespace() {
255                    "wasi" => true,
256                    "obelisk" => import.ffqn.ifc_fqn.deref() == "obelisk:log/log@1.0.0",
257                    _ => false,
258                }
259            }
260            ComponentType::Workflow => {
261                // log + workflow support + types
262                matches!(
263                    import.ffqn.ifc_fqn.pkg_fqn_name().to_string().as_str(),
264                    "obelisk:log@1.0.0"
265                        | "obelisk:workflow@5.0.0"
266                        | "obelisk:workflow@5.1.0"
267                        | "obelisk:types@4.2.0"
268                )
269            }
270            ComponentType::WebhookEndpoint => {
271                // webhook support + wasi + log + types (needed for scheduling)
272                match import.ffqn.ifc_fqn.namespace() {
273                    "wasi" => true,
274                    "obelisk" => matches!(
275                        import.ffqn.ifc_fqn.pkg_fqn_name().to_string().as_str(),
276                        "obelisk:webhook@5.2.0"
277                            | "obelisk:webhook@5.1.0"
278                            | "obelisk:webhook@5.0.0"
279                            | "obelisk:log@1.0.0"
280                            | "obelisk:types@4.0.0"
281                            | "obelisk:types@4.1.0"
282                            | "obelisk:types@4.2.0"
283                    ),
284                    _ => false,
285                }
286            }
287            ComponentType::ActivityStub | ComponentType::Cron => false,
288        }
289    }
290
291    fn verify_imports_component(&self, component: &ComponentConfig, errors: &mut Vec<String>) {
292        let component_id = &component.component_id;
293        for imported_fn_metadata in &component.imports {
294            if let Some((exported_component_id, exported_fn_metadata)) = self
295                .inner
296                .exported_ffqns_ext
297                .get(&imported_fn_metadata.ffqn)
298            {
299                // check parameters
300                if imported_fn_metadata.parameter_types != exported_fn_metadata.parameter_types {
301                    error!(
302                        "Parameter types do not match: {ffqn} imported by {component_id} , exported by {exported_component_id}",
303                        ffqn = imported_fn_metadata.ffqn
304                    );
305                    error!(
306                        "Import {import}",
307                        import = serde_json::to_string(imported_fn_metadata).unwrap(), // TODO: print in WIT format
308                    );
309                    error!(
310                        "Export {export}",
311                        export = serde_json::to_string(exported_fn_metadata).unwrap(),
312                    );
313                    errors.push(format!("parameter types do not match: {component_id} imports {imported_fn_metadata} , {exported_component_id} exports {exported_fn_metadata}"));
314                }
315                if imported_fn_metadata.return_type != exported_fn_metadata.return_type {
316                    error!(
317                        "Return types do not match: {ffqn} imported by {component_id} , exported by {exported_component_id}",
318                        ffqn = imported_fn_metadata.ffqn
319                    );
320                    error!(
321                        "Import {import}",
322                        import = serde_json::to_string(imported_fn_metadata).unwrap(), // TODO: print in WIT format
323                    );
324                    error!(
325                        "Export {export}",
326                        export = serde_json::to_string(exported_fn_metadata).unwrap(),
327                    );
328                    errors.push(format!("return types do not match: {component_id} imports {imported_fn_metadata} , {exported_component_id} exports {exported_fn_metadata}"));
329                }
330            } else if !Self::additional_import_allowlist(
331                imported_fn_metadata,
332                component_id.component_type,
333            ) {
334                errors.push(format!(
335                    "function imported by {component_id} not found: {imported_fn_metadata}"
336                ));
337            }
338        }
339    }
340}
341
342#[derive(Debug, Clone)]
343pub struct ComponentConfigRegistryRO {
344    inner: Arc<ComponentConfigRegistryInner>,
345}
346
347impl ComponentConfigRegistryRO {
348    /// Look up WIT by content digest. Returns `None` if the digest is not found.
349    #[must_use]
350    pub fn get_wit(&self, input_digest: &ComponentDigest) -> Option<&str> {
351        self.inner
352            .digests_to_wit
353            .get(input_digest)
354            .map(std::string::String::as_str)
355    }
356
357    #[must_use]
358    pub fn find_by_exported_ffqn_submittable(
359        &self,
360        ffqn: &FunctionFqn,
361    ) -> Option<(&ComponentId, &FunctionMetadata)> {
362        self.inner
363            .exported_ffqns_ext
364            .get(ffqn)
365            .and_then(|(component_id, fn_metadata)| {
366                if fn_metadata.submittable {
367                    Some((component_id, fn_metadata))
368                } else {
369                    None
370                }
371            })
372    }
373
374    #[must_use]
375    pub fn find_by_exported_ffqn(
376        &self,
377        ffqn: &FunctionFqn,
378    ) -> Option<(&ComponentId, &FunctionMetadata)> {
379        self.inner
380            .exported_ffqns_ext
381            .get(ffqn)
382            .map(|t| (&t.0, &t.1))
383    }
384
385    #[must_use]
386    pub fn find_by_exported_ffqn_stub(
387        &self,
388        ffqn: &FunctionFqn,
389    ) -> Option<(&ComponentId, &FunctionMetadata)> {
390        self.inner
391            .exported_ffqns_ext
392            .get(ffqn)
393            .and_then(|(component_id, fn_metadata)| {
394                if component_id.component_type == ComponentType::ActivityStub {
395                    assert!(!ffqn.ifc_fqn.is_extension());
396                    Some((component_id, fn_metadata))
397                } else {
398                    None
399                }
400            })
401    }
402
403    /// List components. When `extensions` is set to false, extended functions are stripped from exports in each component.
404    #[must_use]
405    pub fn list(&self, extensions: bool) -> Vec<ComponentConfig> {
406        self.inner
407            .names_to_components
408            .values()
409            .cloned()
410            .map(|mut component| {
411                // If no extensions are requested, retain those that are !ext
412                if !extensions && let Some(importable) = &mut component.workflow_or_activity_config
413                {
414                    importable
415                        .exports_ext
416                        .retain(|fn_metadata| !fn_metadata.ffqn.ifc_fqn.is_extension());
417                    importable
418                        .exports_hierarchy_ext
419                        .retain(|ifc_fns| !ifc_fns.extension);
420                }
421                component
422            })
423            .collect()
424    }
425}
426
427impl FunctionRegistry for ComponentConfigRegistryRO {
428    fn get_by_exported_function(
429        &self,
430        ffqn: &FunctionFqn,
431    ) -> Option<(FunctionMetadata, ComponentId)> {
432        if ffqn.ifc_fqn.is_extension() {
433            None
434        } else {
435            self.inner
436                .exported_ffqns_ext
437                .get(ffqn)
438                .map(|(id, metadata)| (metadata.clone(), id.clone()))
439        }
440    }
441
442    fn all_exports(&self) -> &[PackageIfcFns] {
443        &self.inner.export_hierarchy
444    }
445}