1use crate::workflow::workflow_js_worker::WorkflowJsWorker;
4use crate::workflow::workflow_worker::WorkflowWorker;
5use concepts::ComponentId;
6use concepts::ComponentType;
7use concepts::FunctionFqn;
8use concepts::FunctionMetadata;
9use concepts::FunctionRegistry;
10use concepts::IfcFqnName;
11use concepts::PackageIfcFns;
12use concepts::StrVariant;
13use concepts::component_id::ComponentDigest;
14use hashbrown::HashMap;
15use indexmap::IndexMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::Arc;
19use tracing::error;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display)]
23pub enum WitOrigin {
24 #[display("wasm")]
25 Wasm,
26 #[display("synthesized")]
27 Synthesized,
28}
29
30#[derive(Debug, Clone)]
32pub struct ComponentConfig {
33 pub component_id: ComponentId,
34 pub imports: Vec<FunctionMetadata>,
35 pub workflow_or_activity_config: Option<ComponentConfigImportable>,
36 pub wit: String,
37 pub wit_origin: WitOrigin,
39}
40
41#[derive(Clone)]
44pub enum ReplayWorker {
45 Wasm(Arc<WorkflowWorker>),
46 Js(Arc<WorkflowJsWorker>),
47}
48
49impl Debug for ReplayWorker {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 ReplayWorker::Wasm(_) => f.write_str("ReplayWorker::Wasm(..)"),
53 ReplayWorker::Js(_) => f.write_str("ReplayWorker::Js(..)"),
54 }
55 }
56}
57
58#[derive(Default, Debug, Clone)]
60pub struct ReplayWorkerRegistry {
61 workers: HashMap<ComponentDigest, (ComponentId, ReplayWorker)>,
62}
63
64impl ReplayWorkerRegistry {
65 pub fn insert(&mut self, component_id: ComponentId, worker: ReplayWorker) {
66 let digest = component_id.component_digest.clone();
67 let old = self.workers.insert(digest, (component_id, worker));
68 assert!(
69 old.is_none(),
70 "replay worker already registered for this digest"
71 );
72 }
73
74 #[must_use]
75 pub fn get(&self, digest: &ComponentDigest) -> Option<(&ComponentId, &ReplayWorker)> {
76 self.workers.get(digest).map(|(id, w)| (id, w))
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct ComponentConfigImportable {
83 pub exports_ext: Vec<FunctionMetadata>,
84 pub exports_hierarchy_ext: Vec<PackageIfcFns>,
85}
86
87#[derive(Default, Debug)]
88pub struct ComponentConfigRegistry {
89 inner: ComponentConfigRegistryInner,
90}
91
92#[derive(Default, Debug)]
93struct ComponentConfigRegistryInner {
94 exported_ffqns_ext: IndexMap<FunctionFqn, (ComponentId, FunctionMetadata)>,
95 export_hierarchy: Vec<PackageIfcFns>,
96 export_hierarchy_origin: HashMap<IfcFqnName, WitOrigin>,
100 names_to_components: IndexMap<StrVariant, ComponentConfig>,
102 digests_to_wit: IndexMap<ComponentDigest, String>,
104}
105
106#[derive(Debug, Clone, thiserror::Error)]
107#[error("registering component failed: {0}")]
108pub struct ComponentInsertionError(StrVariant);
109
110impl ComponentConfigRegistry {
111 pub fn insert(&mut self, component: ComponentConfig) -> Result<(), ComponentInsertionError> {
112 let name = &component.component_id.name;
113 if self.inner.names_to_components.contains_key(name) {
115 return Err(ComponentInsertionError(
116 format!("component with name `{name}` is already registered").into(),
117 ));
118 }
119
120 if let Some(workflow_or_activity_config) = &component.workflow_or_activity_config {
126 if self
127 .inner
128 .digests_to_wit
129 .contains_key(&component.component_id.component_digest)
130 {
131 return Err(ComponentInsertionError(
132 format!(
133 "component {} is already inserted with the same digest",
134 component.component_id
135 )
136 .into(),
137 ));
138 }
139
140 for exported_ffqn in workflow_or_activity_config
141 .exports_ext
142 .iter()
143 .map(|f| &f.ffqn)
144 {
145 if let Some((conflicting_id, _)) = self.inner.exported_ffqns_ext.get(exported_ffqn)
146 {
147 return Err(ComponentInsertionError(
148 format!(
149 "function {exported_ffqn} is already exported by component {conflicting_id}, cannot insert {}",
150 component.component_id
151 ).into()));
152 }
153 }
154 for exported_fn_metadata in &workflow_or_activity_config.exports_ext {
156 let old = self.inner.exported_ffqns_ext.insert(
157 exported_fn_metadata.ffqn.clone(),
158 (component.component_id.clone(), exported_fn_metadata.clone()),
159 );
160 assert!(old.is_none());
161 }
162 for new_ifc_fns in &workflow_or_activity_config.exports_hierarchy_ext {
164 if !new_ifc_fns.extension {
165 if let Some(&existing_origin) =
166 self.inner.export_hierarchy_origin.get(&new_ifc_fns.ifc_fqn)
167 {
168 if existing_origin != component.wit_origin {
169 return Err(ComponentInsertionError(
170 format!(
171 "interface `{}` is already exported by a {} component, cannot insert {} which is a {} component",
172 new_ifc_fns.ifc_fqn,
173 existing_origin,
174 component.component_id,
175 component.wit_origin,
176 )
177 .into(),
178 ));
179 }
180 } else {
181 self.inner
182 .export_hierarchy_origin
183 .insert(new_ifc_fns.ifc_fqn.clone(), component.wit_origin);
184 }
185 }
186 if let Some(existing) = self.inner.export_hierarchy.iter_mut().find(|e| {
187 e.ifc_fqn == new_ifc_fns.ifc_fqn && e.extension == new_ifc_fns.extension
188 }) {
189 existing.fns.extend(new_ifc_fns.fns.clone());
190 } else {
191 self.inner.export_hierarchy.push(new_ifc_fns.clone());
192 }
193 }
194
195 let old = self.inner.digests_to_wit.insert(
197 component.component_id.component_digest.clone(),
198 component.wit.clone(),
199 );
200 assert!(old.is_none());
201 } else if component.component_id.component_type == ComponentType::WebhookEndpoint {
202 self.inner
204 .digests_to_wit
205 .entry(component.component_id.component_digest.clone())
206 .or_insert(component.wit.clone());
207 } let old = self
210 .inner
211 .names_to_components
212 .insert(name.clone(), component);
213 assert!(old.is_none());
214
215 Ok(())
216 }
217
218 pub fn verify_registry(
223 self,
224 ) -> (
225 ComponentConfigRegistryRO,
226 Option<String>, ) {
228 let mut errors = Vec::new();
229 for examined_component in self.inner.names_to_components.values() {
230 self.verify_imports_component(examined_component, &mut errors);
231 }
232 let errors = if !errors.is_empty() {
233 let errors = errors.join("\n");
234 tracing::warn!("component resolution error: \n{errors}");
235 Some(errors)
236 } else {
237 None
238 };
239 (
240 ComponentConfigRegistryRO {
241 inner: Arc::new(self.inner),
242 },
243 errors,
244 )
245 }
246
247 fn additional_import_allowlist(
248 import: &FunctionMetadata,
249 component_type: ComponentType,
250 ) -> bool {
251 match component_type {
252 ComponentType::Activity => {
253 match import.ffqn.ifc_fqn.namespace() {
255 "wasi" => true,
256 "obelisk" => import.ffqn.ifc_fqn.deref() == "obelisk:log/log@1.0.0",
257 _ => false,
258 }
259 }
260 ComponentType::Workflow => {
261 matches!(
263 import.ffqn.ifc_fqn.pkg_fqn_name().to_string().as_str(),
264 "obelisk:log@1.0.0"
265 | "obelisk:workflow@5.0.0"
266 | "obelisk:workflow@5.1.0"
267 | "obelisk:types@4.2.0"
268 )
269 }
270 ComponentType::WebhookEndpoint => {
271 match import.ffqn.ifc_fqn.namespace() {
273 "wasi" => true,
274 "obelisk" => matches!(
275 import.ffqn.ifc_fqn.pkg_fqn_name().to_string().as_str(),
276 "obelisk:webhook@5.2.0"
277 | "obelisk:webhook@5.1.0"
278 | "obelisk:webhook@5.0.0"
279 | "obelisk:log@1.0.0"
280 | "obelisk:types@4.0.0"
281 | "obelisk:types@4.1.0"
282 | "obelisk:types@4.2.0"
283 ),
284 _ => false,
285 }
286 }
287 ComponentType::ActivityStub | ComponentType::Cron => false,
288 }
289 }
290
291 fn verify_imports_component(&self, component: &ComponentConfig, errors: &mut Vec<String>) {
292 let component_id = &component.component_id;
293 for imported_fn_metadata in &component.imports {
294 if let Some((exported_component_id, exported_fn_metadata)) = self
295 .inner
296 .exported_ffqns_ext
297 .get(&imported_fn_metadata.ffqn)
298 {
299 if imported_fn_metadata.parameter_types != exported_fn_metadata.parameter_types {
301 error!(
302 "Parameter types do not match: {ffqn} imported by {component_id} , exported by {exported_component_id}",
303 ffqn = imported_fn_metadata.ffqn
304 );
305 error!(
306 "Import {import}",
307 import = serde_json::to_string(imported_fn_metadata).unwrap(), );
309 error!(
310 "Export {export}",
311 export = serde_json::to_string(exported_fn_metadata).unwrap(),
312 );
313 errors.push(format!("parameter types do not match: {component_id} imports {imported_fn_metadata} , {exported_component_id} exports {exported_fn_metadata}"));
314 }
315 if imported_fn_metadata.return_type != exported_fn_metadata.return_type {
316 error!(
317 "Return types do not match: {ffqn} imported by {component_id} , exported by {exported_component_id}",
318 ffqn = imported_fn_metadata.ffqn
319 );
320 error!(
321 "Import {import}",
322 import = serde_json::to_string(imported_fn_metadata).unwrap(), );
324 error!(
325 "Export {export}",
326 export = serde_json::to_string(exported_fn_metadata).unwrap(),
327 );
328 errors.push(format!("return types do not match: {component_id} imports {imported_fn_metadata} , {exported_component_id} exports {exported_fn_metadata}"));
329 }
330 } else if !Self::additional_import_allowlist(
331 imported_fn_metadata,
332 component_id.component_type,
333 ) {
334 errors.push(format!(
335 "function imported by {component_id} not found: {imported_fn_metadata}"
336 ));
337 }
338 }
339 }
340}
341
342#[derive(Debug, Clone)]
343pub struct ComponentConfigRegistryRO {
344 inner: Arc<ComponentConfigRegistryInner>,
345}
346
347impl ComponentConfigRegistryRO {
348 #[must_use]
350 pub fn get_wit(&self, input_digest: &ComponentDigest) -> Option<&str> {
351 self.inner
352 .digests_to_wit
353 .get(input_digest)
354 .map(std::string::String::as_str)
355 }
356
357 #[must_use]
358 pub fn find_by_exported_ffqn_submittable(
359 &self,
360 ffqn: &FunctionFqn,
361 ) -> Option<(&ComponentId, &FunctionMetadata)> {
362 self.inner
363 .exported_ffqns_ext
364 .get(ffqn)
365 .and_then(|(component_id, fn_metadata)| {
366 if fn_metadata.submittable {
367 Some((component_id, fn_metadata))
368 } else {
369 None
370 }
371 })
372 }
373
374 #[must_use]
375 pub fn find_by_exported_ffqn(
376 &self,
377 ffqn: &FunctionFqn,
378 ) -> Option<(&ComponentId, &FunctionMetadata)> {
379 self.inner
380 .exported_ffqns_ext
381 .get(ffqn)
382 .map(|t| (&t.0, &t.1))
383 }
384
385 #[must_use]
386 pub fn find_by_exported_ffqn_stub(
387 &self,
388 ffqn: &FunctionFqn,
389 ) -> Option<(&ComponentId, &FunctionMetadata)> {
390 self.inner
391 .exported_ffqns_ext
392 .get(ffqn)
393 .and_then(|(component_id, fn_metadata)| {
394 if component_id.component_type == ComponentType::ActivityStub {
395 assert!(!ffqn.ifc_fqn.is_extension());
396 Some((component_id, fn_metadata))
397 } else {
398 None
399 }
400 })
401 }
402
403 #[must_use]
405 pub fn list(&self, extensions: bool) -> Vec<ComponentConfig> {
406 self.inner
407 .names_to_components
408 .values()
409 .cloned()
410 .map(|mut component| {
411 if !extensions && let Some(importable) = &mut component.workflow_or_activity_config
413 {
414 importable
415 .exports_ext
416 .retain(|fn_metadata| !fn_metadata.ffqn.ifc_fqn.is_extension());
417 importable
418 .exports_hierarchy_ext
419 .retain(|ifc_fns| !ifc_fns.extension);
420 }
421 component
422 })
423 .collect()
424 }
425}
426
427impl FunctionRegistry for ComponentConfigRegistryRO {
428 fn get_by_exported_function(
429 &self,
430 ffqn: &FunctionFqn,
431 ) -> Option<(FunctionMetadata, ComponentId)> {
432 if ffqn.ifc_fqn.is_extension() {
433 None
434 } else {
435 self.inner
436 .exported_ffqns_ext
437 .get(ffqn)
438 .map(|(id, metadata)| (metadata.clone(), id.clone()))
439 }
440 }
441
442 fn all_exports(&self) -> &[PackageIfcFns] {
443 &self.inner.export_hierarchy
444 }
445}