1#[cfg(feature = "rusqlite")]
2mod rusqlite_ext;
3pub mod storage;
4pub mod time;
5
6use ::serde::{Deserialize, Serialize};
7use assert_matches::assert_matches;
8pub use indexmap;
9use indexmap::IndexMap;
10use opentelemetry::propagation::{Extractor, Injector};
11pub use prefixed_ulid::ExecutionId;
12use prefixed_ulid::ExecutionIdParseError;
13use serde_json::Value;
14use std::{
15 borrow::Borrow,
16 fmt::{Debug, Display},
17 hash::Hash,
18 marker::PhantomData,
19 ops::Deref,
20 str::FromStr,
21 sync::Arc,
22 time::Duration,
23};
24use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
25use tracing::Span;
26use val_json::{
27 type_wrapper::{TypeConversionError, TypeWrapper},
28 wast_val::{WastVal, WastValWithType},
29 wast_val_ser::params,
30};
31use wasmtime::component::{Type, Val};
32
33pub const NAMESPACE_OBELISK: &str = "obelisk";
34const NAMESPACE_WASI: &str = "wasi";
35pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
36pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
37pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
38
39#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40pub enum FinishedExecutionError {
41 #[error("permanent timeout")]
43 PermanentTimeout,
44 #[error("permanent failure: {reason_full}")]
45 PermanentFailure {
46 reason_inner: String, reason_full: String,
50 kind: PermanentFailureKind,
51 detail: Option<String>,
52 },
53}
54impl FinishedExecutionError {
55 #[must_use]
56 pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
57 match self {
58 FinishedExecutionError::PermanentTimeout => PendingStateFinishedError::Timeout,
59 FinishedExecutionError::PermanentFailure { .. } => {
60 PendingStateFinishedError::ExecutionFailure
61 }
62 }
63 }
64
65 #[must_use]
66 pub fn new_stubbed_error() -> Self {
67 let reason = "stubbed error".to_string();
68 Self::PermanentFailure {
69 reason_inner: reason.clone(),
70 reason_full: reason,
71 kind: PermanentFailureKind::StubbedError,
72 detail: None,
73 }
74 }
75}
76
77#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub enum PermanentFailureKind {
80 NondeterminismDetected,
82 ParamsParsingError,
84 CannotInstantiate,
86 ResultParsingError,
88 ImportedFunctionCallError,
90 ActivityTrap,
92 WorkflowTrap,
94 WebhookEndpointError,
96 StubbedError,
98 OutOfFuel,
100}
101
102#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "snake_case")]
104pub enum TrapKind {
105 #[display("trap")]
106 Trap,
107 #[display("post_return_trap")]
108 PostReturnTrap,
109 #[display("out of fuel")]
110 OutOfFuel,
111 #[display("host function error")]
112 HostFunctionError,
113}
114
115#[derive(Clone, Eq, derive_more::Display)]
116pub enum StrVariant {
117 Static(&'static str),
118 Arc(Arc<str>),
119}
120
121impl StrVariant {
122 #[must_use]
123 pub const fn empty() -> StrVariant {
124 StrVariant::Static("")
125 }
126}
127
128impl From<String> for StrVariant {
129 fn from(value: String) -> Self {
130 StrVariant::Arc(Arc::from(value))
131 }
132}
133
134impl From<&'static str> for StrVariant {
135 fn from(value: &'static str) -> Self {
136 StrVariant::Static(value)
137 }
138}
139
140impl PartialEq for StrVariant {
141 fn eq(&self, other: &Self) -> bool {
142 match (self, other) {
143 (Self::Static(left), Self::Static(right)) => left == right,
144 (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
145 (Self::Arc(left), Self::Arc(right)) => left == right,
146 (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
147 }
148 }
149}
150
151impl Hash for StrVariant {
152 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
153 match self {
154 StrVariant::Static(val) => val.hash(state),
155 StrVariant::Arc(val) => {
156 let str: &str = val.deref();
157 str.hash(state);
158 }
159 }
160 }
161}
162
163impl Debug for StrVariant {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 Display::fmt(self, f)
166 }
167}
168
169impl Deref for StrVariant {
170 type Target = str;
171 fn deref(&self) -> &Self::Target {
172 match self {
173 Self::Arc(v) => v,
174 Self::Static(v) => v,
175 }
176 }
177}
178
179impl AsRef<str> for StrVariant {
180 fn as_ref(&self) -> &str {
181 match self {
182 Self::Arc(v) => v,
183 Self::Static(v) => v,
184 }
185 }
186}
187
188mod serde_strvariant {
189 use crate::StrVariant;
190 use serde::{
191 Deserialize, Deserializer, Serialize, Serializer,
192 de::{self, Visitor},
193 };
194 use std::{ops::Deref, sync::Arc};
195
196 impl Serialize for StrVariant {
197 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198 where
199 S: Serializer,
200 {
201 serializer.serialize_str(self.deref())
202 }
203 }
204
205 impl<'de> Deserialize<'de> for StrVariant {
206 fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
207 where
208 D: Deserializer<'de>,
209 {
210 deserializer.deserialize_str(StrVariantVisitor)
211 }
212 }
213
214 struct StrVariantVisitor;
215
216 impl Visitor<'_> for StrVariantVisitor {
217 type Value = StrVariant;
218
219 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
220 formatter.write_str("a string")
221 }
222
223 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
224 where
225 E: de::Error,
226 {
227 Ok(StrVariant::Arc(Arc::from(v)))
228 }
229 }
230}
231
232#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
233#[display("{value}")]
234#[serde(transparent)]
235pub struct Name<T> {
236 pub value: StrVariant,
237 #[serde(skip)]
238 phantom_data: PhantomData<fn(T) -> T>,
239}
240
241impl<T> Name<T> {
242 #[must_use]
243 pub fn new_arc(value: Arc<str>) -> Self {
244 Self {
245 value: StrVariant::Arc(value),
246 phantom_data: PhantomData,
247 }
248 }
249
250 #[must_use]
251 pub const fn new_static(value: &'static str) -> Self {
252 Self {
253 value: StrVariant::Static(value),
254 phantom_data: PhantomData,
255 }
256 }
257}
258
259impl<T> Debug for Name<T> {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 Display::fmt(&self, f)
262 }
263}
264
265impl<T> Deref for Name<T> {
266 type Target = str;
267
268 fn deref(&self) -> &Self::Target {
269 self.value.deref()
270 }
271}
272
273impl<T> Borrow<str> for Name<T> {
274 fn borrow(&self) -> &str {
275 self.deref()
276 }
277}
278
279impl<T> From<String> for Name<T> {
280 fn from(value: String) -> Self {
281 Self::new_arc(Arc::from(value))
282 }
283}
284
285#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
286#[display("{}", self.suffix())]
287pub enum PackageExtension {
288 ObeliskExt,
289 ObeliskSchedule,
290 ObeliskStub,
291}
292impl PackageExtension {
293 fn suffix(&self) -> &'static str {
294 match self {
295 PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
296 PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
297 PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
298 }
299 }
300}
301
302#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
303#[cfg_attr(feature = "test", derive(Serialize))]
304pub struct PkgFqn {
305 pub namespace: String, pub package_name: String,
307 pub version: Option<String>,
308}
309impl PkgFqn {
310 #[must_use]
311 pub fn is_extension(&self) -> bool {
312 Self::is_package_name_ext(&self.package_name)
313 }
314
315 #[must_use]
316 pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
317 use strum::IntoEnumIterator;
318 for package_ext in PackageExtension::iter() {
319 if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
320 return Some((
321 PkgFqn {
322 namespace: self.namespace.clone(),
323 package_name: package_name.to_string(),
324 version: self.version.clone(),
325 },
326 package_ext,
327 ));
328 }
329 }
330 None
331 }
332
333 fn is_package_name_ext(package_name: &str) -> bool {
334 package_name.ends_with(SUFFIX_PKG_EXT)
335 || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
336 || package_name.ends_with(SUFFIX_PKG_STUB)
337 }
338}
339impl Display for PkgFqn {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 let PkgFqn {
342 namespace,
343 package_name,
344 version,
345 } = self;
346 if let Some(version) = version {
347 write!(f, "{namespace}:{package_name}@{version}")
348 } else {
349 write!(f, "{namespace}:{package_name}")
350 }
351 }
352}
353
354#[derive(Hash, Clone, PartialEq, Eq)]
355pub struct IfcFqnMarker;
356
357pub type IfcFqnName = Name<IfcFqnMarker>; impl IfcFqnName {
360 #[must_use]
361 pub fn namespace(&self) -> &str {
362 self.deref().split_once(':').unwrap().0
363 }
364
365 #[must_use]
366 pub fn package_name(&self) -> &str {
367 let after_colon = self.deref().split_once(':').unwrap().1;
368 after_colon.split_once('/').unwrap().0
369 }
370
371 #[must_use]
372 pub fn version(&self) -> Option<&str> {
373 self.deref().split_once('@').map(|(_, version)| version)
374 }
375
376 #[must_use]
377 pub fn pkg_fqn_name(&self) -> PkgFqn {
378 let (namespace, rest) = self.deref().split_once(':').unwrap();
379 let (package_name, rest) = rest.split_once('/').unwrap();
380 let version = rest.split_once('@').map(|(_, version)| version);
381 PkgFqn {
382 namespace: namespace.to_string(),
383 package_name: package_name.to_string(),
384 version: version.map(std::string::ToString::to_string),
385 }
386 }
387
388 #[must_use]
389 pub fn ifc_name(&self) -> &str {
390 let after_colon = self.deref().split_once(':').unwrap().1;
391 let after_slash = after_colon.split_once('/').unwrap().1;
392 after_slash
393 .split_once('@')
394 .map_or(after_slash, |(ifc, _)| ifc)
395 }
396
397 #[must_use]
398 pub fn from_parts(
399 namespace: &str,
400 package_name: &str,
401 ifc_name: &str,
402 version: Option<&str>,
403 ) -> Self {
404 let mut str = format!("{namespace}:{package_name}/{ifc_name}");
405 if let Some(version) = version {
406 str += "@";
407 str += version;
408 }
409 Self::new_arc(Arc::from(str))
410 }
411
412 #[must_use]
413 pub fn is_extension(&self) -> bool {
415 PkgFqn::is_package_name_ext(self.package_name())
416 }
417
418 #[must_use]
419 pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
420 self.package_name().strip_suffix(SUFFIX_PKG_EXT)
421 }
422
423 #[must_use]
424 pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
425 self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
426 }
427
428 #[must_use]
429 pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
430 self.package_name().strip_suffix(SUFFIX_PKG_STUB)
431 }
432
433 #[must_use]
434 pub fn is_namespace_obelisk(&self) -> bool {
435 self.namespace() == NAMESPACE_OBELISK
436 }
437
438 #[must_use]
439 pub fn is_namespace_wasi(&self) -> bool {
440 self.namespace() == NAMESPACE_WASI
441 }
442}
443
444#[derive(Hash, Clone, PartialEq, Eq)]
445pub struct FnMarker;
446
447pub type FnName = Name<FnMarker>;
448
449#[derive(Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
450pub struct FunctionFqn {
451 pub ifc_fqn: IfcFqnName,
452 pub function_name: FnName,
453}
454
455impl FunctionFqn {
456 #[must_use]
457 pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
458 Self {
459 ifc_fqn: Name::new_arc(ifc_fqn),
460 function_name: Name::new_arc(function_name),
461 }
462 }
463
464 #[must_use]
465 pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
466 Self {
467 ifc_fqn: Name::new_static(ifc_fqn),
468 function_name: Name::new_static(function_name),
469 }
470 }
471
472 #[must_use]
473 pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
474 Self::new_static(tuple.0, tuple.1)
475 }
476
477 pub fn try_from_tuple(
478 ifc_fqn: &str,
479 function_name: &str,
480 ) -> Result<Self, FunctionFqnParseError> {
481 if function_name.contains('.') {
482 Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
483 } else {
484 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
485 }
486 }
487}
488
489#[derive(Debug, thiserror::Error)]
490pub enum FunctionFqnParseError {
491 #[error("delimiter `.` not found")]
492 DelimiterNotFound,
493 #[error("delimiter `.` found in function name")]
494 DelimiterFoundInFunctionName,
495}
496
497impl FromStr for FunctionFqn {
498 type Err = FunctionFqnParseError;
499
500 fn from_str(s: &str) -> Result<Self, Self::Err> {
501 if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
502 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
503 } else {
504 Err(FunctionFqnParseError::DelimiterNotFound)
505 }
506 }
507}
508
509impl Display for FunctionFqn {
510 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511 write!(
512 f,
513 "{ifc_fqn}.{function_name}",
514 ifc_fqn = self.ifc_fqn,
515 function_name = self.function_name
516 )
517 }
518}
519
520impl Debug for FunctionFqn {
521 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522 Display::fmt(&self, f)
523 }
524}
525
526#[cfg(any(test, feature = "test"))]
527impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
528 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
529 let illegal = [':', '@', '.'];
530 let namespace = u.arbitrary::<String>()?.replace(illegal, "");
531 let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
532 let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
533 let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
534
535 Ok(FunctionFqn::new_arc(
536 Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
537 Arc::from(fn_name),
538 ))
539 }
540}
541
542#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
543pub struct TypeWrapperTopLevel {
544 pub ok: Option<Box<TypeWrapper>>,
545 pub err: Option<Box<TypeWrapper>>,
546}
547impl From<TypeWrapperTopLevel> for TypeWrapper {
548 fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
549 TypeWrapper::Result {
550 ok: value.ok,
551 err: value.err,
552 }
553 }
554}
555
556#[derive(Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize)]
557pub enum SupportedFunctionReturnValue {
558 Ok {
559 #[debug(skip)]
560 ok: Option<WastValWithType>,
561 },
562 Err {
563 #[debug(skip)]
564 err: Option<WastValWithType>,
565 },
566 ExecutionError(FinishedExecutionError),
567}
568pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
569 SupportedFunctionReturnValue::Ok { ok: None };
570
571#[derive(Debug, thiserror::Error)]
572pub enum ResultParsingError {
573 #[error("return value must not be empty")]
574 NoValue,
575 #[error("return value cannot be parsed, multi-value results are not supported")]
576 MultiValue,
577 #[error("return value cannot be parsed, {0}")]
578 TypeConversionError(val_json::type_wrapper::TypeConversionError),
579 #[error(transparent)]
580 ResultParsingErrorFromVal(ResultParsingErrorFromVal),
581}
582
583#[derive(Debug, thiserror::Error)]
584pub enum ResultParsingErrorFromVal {
585 #[error("return value cannot be parsed, {0}")]
586 WastValConversionError(val_json::wast_val::WastValConversionError),
587 #[error("top level type must be a result")]
588 TopLevelTypeMustBeAResult,
589 #[error("value does not type check")]
590 TypeCheckError,
591}
592
593impl SupportedFunctionReturnValue {
594 pub fn new<
595 I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
596 >(
597 mut iter: I,
598 ) -> Result<Self, ResultParsingError> {
599 if iter.len() == 0 {
600 Err(ResultParsingError::NoValue)
601 } else if iter.len() == 1 {
602 let (val, r#type) = iter.next().unwrap();
603 let r#type =
604 TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
605 Self::from_val_and_type_wrapper(val, r#type)
606 .map_err(ResultParsingError::ResultParsingErrorFromVal)
607 } else {
608 Err(ResultParsingError::MultiValue)
609 }
610 }
611
612 #[expect(clippy::result_unit_err)]
613 pub fn from_wast_val_with_type(
614 value: WastValWithType,
615 ) -> Result<SupportedFunctionReturnValue, ()> {
616 match value {
617 WastValWithType {
618 r#type: TypeWrapper::Result { ok: None, err: _ },
619 value: WastVal::Result(Ok(None)),
620 } => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
621 WastValWithType {
622 r#type:
623 TypeWrapper::Result {
624 ok: Some(ok),
625 err: _,
626 },
627 value: WastVal::Result(Ok(Some(value))),
628 } => Ok(SupportedFunctionReturnValue::Ok {
629 ok: Some(WastValWithType {
630 r#type: *ok,
631 value: *value,
632 }),
633 }),
634 WastValWithType {
635 r#type: TypeWrapper::Result { ok: _, err: None },
636 value: WastVal::Result(Err(None)),
637 } => Ok(SupportedFunctionReturnValue::Err { err: None }),
638 WastValWithType {
639 r#type:
640 TypeWrapper::Result {
641 ok: _,
642 err: Some(err),
643 },
644 value: WastVal::Result(Err(Some(value))),
645 } => Ok(SupportedFunctionReturnValue::Err {
646 err: Some(WastValWithType {
647 r#type: *err,
648 value: *value,
649 }),
650 }),
651 _ => Err(()),
652 }
653 }
654
655 pub fn from_val_and_type_wrapper(
656 value: wasmtime::component::Val,
657 ty: TypeWrapper,
658 ) -> Result<Self, ResultParsingErrorFromVal> {
659 let TypeWrapper::Result { ok, err } = ty else {
660 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
661 };
662 let ty = TypeWrapperTopLevel { ok, err };
663 Self::from_val_and_type_wrapper_tl(value, ty)
664 }
665
666 pub fn from_val_and_type_wrapper_tl(
667 value: wasmtime::component::Val,
668 ty: TypeWrapperTopLevel,
669 ) -> Result<Self, ResultParsingErrorFromVal> {
670 let wasmtime::component::Val::Result(value) = value else {
671 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
672 };
673
674 match (ty.ok, ty.err, value) {
675 (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
676 (Some(ok_type), _, Ok(Some(value))) => Ok(SupportedFunctionReturnValue::Ok {
677 ok: Some(WastValWithType {
678 r#type: *ok_type,
679 value: WastVal::try_from(*value)
680 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
681 }),
682 }),
683 (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err { err: None }),
684 (_, Some(err_type), Err(Some(value))) => Ok(SupportedFunctionReturnValue::Err {
685 err: Some(WastValWithType {
686 r#type: *err_type,
687 value: WastVal::try_from(*value)
688 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
689 }),
690 }),
691 _other => Err(ResultParsingErrorFromVal::TypeCheckError),
692 }
693 }
694
695 #[must_use]
696 pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
697 match self {
698 SupportedFunctionReturnValue::Ok { ok: None } => WastVal::Result(Ok(None)),
699 SupportedFunctionReturnValue::Ok { ok: Some(v) } => {
700 WastVal::Result(Ok(Some(Box::new(v.value))))
701 }
702 SupportedFunctionReturnValue::Err { err: None } => WastVal::Result(Err(None)),
703 SupportedFunctionReturnValue::Err { err: Some(v) } => {
704 WastVal::Result(Err(Some(Box::new(v.value))))
705 }
706 SupportedFunctionReturnValue::ExecutionError(_) => {
707 execution_error_to_wast_val(&get_return_type())
708 }
709 }
710 }
711
712 #[must_use]
713 pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
714 match self {
715 SupportedFunctionReturnValue::Ok { ok: _ } => PendingStateFinishedResultKind(Ok(())),
716 SupportedFunctionReturnValue::Err { err: _ } => {
717 PendingStateFinishedResultKind(Err(PendingStateFinishedError::FallibleError))
718 }
719 SupportedFunctionReturnValue::ExecutionError(_) => {
720 PendingStateFinishedResultKind(Err(PendingStateFinishedError::ExecutionFailure))
721 }
722 }
723 }
724}
725
726#[must_use]
727pub fn execution_error_to_wast_val(ret_type: &TypeWrapperTopLevel) -> WastVal {
728 match ret_type {
729 TypeWrapperTopLevel { ok: _, err: None } => return WastVal::Result(Err(None)),
730 TypeWrapperTopLevel {
731 ok: _,
732 err: Some(inner),
733 } => match inner.as_ref() {
734 TypeWrapper::String => {
735 return WastVal::Result(Err(Some(Box::new(WastVal::String(
736 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
737 )))));
738 }
739 TypeWrapper::Variant(variants) => {
740 if let Some(Some(TypeWrapper::Record(fields))) =
741 variants.get(EXECUTION_FAILED_STRING_OR_VARIANT)
742 && fields.is_empty()
743 {
744 return WastVal::Result(Err(Some(Box::new(WastVal::Variant(
745 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
746 None,
747 )))));
748 }
749 }
750 _ => {}
751 },
752 }
753 unreachable!("unexpected top-level return type {ret_type:?} cannot be ReturnTypeCompatible")
754}
755
756#[derive(Debug, Clone, PartialEq, Eq)]
757pub struct Params(ParamsInternal);
758
759#[derive(Debug, Clone, PartialEq, Eq)]
760enum ParamsInternal {
761 JsonValues(Vec<Value>),
762 Vals {
763 vals: Arc<[wasmtime::component::Val]>,
765 },
766 Empty,
767}
768
769impl Default for Params {
770 fn default() -> Self {
771 Self(ParamsInternal::Empty)
772 }
773}
774
775pub const SUFFIX_FN_SUBMIT: &str = "-submit";
776pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
777pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
778pub const SUFFIX_FN_STUB: &str = "-stub";
779pub const SUFFIX_FN_GET: &str = "-get";
780pub const SUFFIX_FN_INVOKE: &str = "-invoke";
781
782#[derive(
783 Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, strum::EnumIter,
784)]
785#[serde(rename_all = "snake_case")]
786pub enum FunctionExtension {
787 Submit,
788 AwaitNext,
789 Schedule,
790 Stub,
791 Get,
792 Invoke,
793}
794impl FunctionExtension {
795 #[must_use]
796 pub fn suffix(&self) -> &'static str {
797 match self {
798 FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
799 FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
800 FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
801 FunctionExtension::Stub => SUFFIX_FN_STUB,
802 FunctionExtension::Get => SUFFIX_FN_GET,
803 FunctionExtension::Invoke => SUFFIX_FN_INVOKE,
804 }
805 }
806
807 #[must_use]
808 pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
809 matches!(
810 (pkg_ext, self),
811 (
812 PackageExtension::ObeliskExt,
813 FunctionExtension::Submit
814 | FunctionExtension::AwaitNext
815 | FunctionExtension::Get
816 | FunctionExtension::Invoke
817 ) | (
818 PackageExtension::ObeliskSchedule,
819 FunctionExtension::Schedule
820 ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
821 )
822 }
823}
824
825#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
826pub struct FunctionMetadata {
827 pub ffqn: FunctionFqn,
828 pub parameter_types: ParameterTypes,
829 pub return_type: ReturnType,
830 pub extension: Option<FunctionExtension>,
831 pub submittable: bool,
833}
834impl FunctionMetadata {
835 #[must_use]
836 pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
837 self.extension.map(|extension| {
838 let prefix = self
839 .ffqn
840 .function_name
841 .value
842 .strip_suffix(extension.suffix())
843 .unwrap_or_else(|| {
844 panic!(
845 "extension function {} must end with expected suffix {}",
846 self.ffqn.function_name,
847 extension.suffix()
848 )
849 });
850 (prefix, extension)
851 })
852 }
853}
854impl Display for FunctionMetadata {
855 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856 write!(
857 f,
858 "{ffqn}: func{params} -> {return_type}",
859 ffqn = self.ffqn,
860 params = self.parameter_types,
861 return_type = self.return_type,
862 )
863 }
864}
865
866pub mod serde_params {
867 use crate::{Params, ParamsInternal};
868 use serde::de::{SeqAccess, Visitor};
869 use serde::ser::SerializeSeq;
870 use serde::{Deserialize, Serialize};
871 use serde_json::Value;
872 use val_json::wast_val::WastVal;
873
874 impl Serialize for Params {
875 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
876 where
877 S: ::serde::Serializer,
878 {
879 match &self.0 {
880 ParamsInternal::Vals { vals } => {
881 let mut seq = serializer.serialize_seq(Some(vals.len()))?; for val in vals.iter() {
883 let value = WastVal::try_from(val.clone())
884 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
885 seq.serialize_element(&value)?;
886 }
887 seq.end()
888 }
889 ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
890 ParamsInternal::JsonValues(vec) => {
891 let mut seq = serializer.serialize_seq(Some(vec.len()))?;
892 for item in vec {
893 seq.serialize_element(item)?;
894 }
895 seq.end()
896 }
897 }
898 }
899 }
900
901 pub struct VecVisitor;
902
903 impl<'de> Visitor<'de> for VecVisitor {
904 type Value = Vec<Value>;
905
906 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
907 formatter.write_str("a sequence of `Value`")
908 }
909
910 #[inline]
911 fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
912 where
913 V: SeqAccess<'de>,
914 {
915 let mut vec = Vec::new();
916 while let Some(elem) = visitor.next_element()? {
917 vec.push(elem);
918 }
919 Ok(vec)
920 }
921 }
922
923 impl<'de> Deserialize<'de> for Params {
924 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
925 where
926 D: serde::Deserializer<'de>,
927 {
928 let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
929 if vec.is_empty() {
930 Ok(Self(ParamsInternal::Empty))
931 } else {
932 Ok(Self(ParamsInternal::JsonValues(vec)))
933 }
934 }
935 }
936}
937
938#[derive(Debug, thiserror::Error)]
939pub enum ParamsParsingError {
940 #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
941 ParameterTypeError {
942 idx: usize,
943 err: TypeConversionError,
944 },
945 #[error("parameters cannot be deserialized: {0}")]
946 ParamsDeserializationError(serde_json::Error),
947 #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
948 ParameterCardinalityMismatch { expected: usize, specified: usize },
949}
950
951impl ParamsParsingError {
952 #[must_use]
953 pub fn detail(&self) -> Option<String> {
954 match self {
955 ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
956 ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
957 ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
958 }
959 }
960}
961
962#[derive(Debug, thiserror::Error)]
963pub enum ParamsFromJsonError {
964 #[error("value must be a json array containing function parameters")]
965 MustBeArray,
966}
967
968impl Params {
969 #[must_use]
970 pub const fn empty() -> Self {
971 Self(ParamsInternal::Empty)
972 }
973
974 #[must_use]
975 pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
976 if vals.is_empty() {
977 Self::empty()
978 } else {
979 Self(ParamsInternal::Vals { vals })
980 }
981 }
982
983 #[must_use]
984 pub fn from_json_values(vec: Vec<Value>) -> Self {
985 if vec.is_empty() {
986 Self::empty()
987 } else {
988 Self(ParamsInternal::JsonValues(vec))
989 }
990 }
991
992 pub fn typecheck<'a>(
993 &self,
994 param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
995 ) -> Result<(), ParamsParsingError> {
996 if param_types.len() != self.len() {
997 return Err(ParamsParsingError::ParameterCardinalityMismatch {
998 expected: param_types.len(),
999 specified: self.len(),
1000 });
1001 }
1002 match &self.0 {
1003 ParamsInternal::Vals { .. } | ParamsInternal::Empty => {}
1004 ParamsInternal::JsonValues(params) => {
1005 params::deserialize_values(params, param_types)
1006 .map_err(ParamsParsingError::ParamsDeserializationError)?;
1007 }
1008 }
1009 Ok(())
1010 }
1011
1012 pub fn as_vals(
1013 &self,
1014 param_types: Box<[(String, Type)]>,
1015 ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1016 if param_types.len() != self.len() {
1017 return Err(ParamsParsingError::ParameterCardinalityMismatch {
1018 expected: param_types.len(),
1019 specified: self.len(),
1020 });
1021 }
1022 match &self.0 {
1023 ParamsInternal::JsonValues(json_vec) => {
1024 let param_types = param_types
1025 .into_vec()
1026 .into_iter()
1027 .enumerate()
1028 .map(|(idx, (_param_name, ty))| {
1029 TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1030 })
1031 .collect::<Result<Vec<_>, _>>()
1032 .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1033 Ok(params::deserialize_values(json_vec, param_types.iter())
1034 .map_err(ParamsParsingError::ParamsDeserializationError)?
1035 .into_iter()
1036 .map(Val::from)
1037 .collect())
1038 }
1039 ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1040 ParamsInternal::Empty => Ok(Arc::from([])),
1041 }
1042 }
1043
1044 #[must_use]
1045 pub fn len(&self) -> usize {
1046 match &self.0 {
1047 ParamsInternal::JsonValues(vec) => vec.len(),
1048 ParamsInternal::Vals { vals, .. } => vals.len(),
1049 ParamsInternal::Empty => 0,
1050 }
1051 }
1052
1053 #[must_use]
1054 pub fn is_empty(&self) -> bool {
1055 self.len() == 0
1056 }
1057}
1058
1059pub mod prefixed_ulid {
1060 use crate::{JoinSetId, JoinSetIdParseError};
1061 use serde_with::{DeserializeFromStr, SerializeDisplay};
1062 use std::{
1063 fmt::{Debug, Display},
1064 hash::Hasher,
1065 marker::PhantomData,
1066 num::ParseIntError,
1067 str::FromStr,
1068 sync::Arc,
1069 };
1070 use ulid::Ulid;
1071
1072 #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
1073 #[derive_where::derive_where(Clone, Copy)]
1074 #[display("{}_{ulid}", Self::prefix())]
1075 pub struct PrefixedUlid<T: 'static> {
1076 ulid: Ulid,
1077 phantom_data: PhantomData<fn(T) -> T>,
1078 }
1079
1080 impl<T> PrefixedUlid<T> {
1081 const fn new(ulid: Ulid) -> Self {
1082 Self {
1083 ulid,
1084 phantom_data: PhantomData,
1085 }
1086 }
1087
1088 fn prefix() -> &'static str {
1089 std::any::type_name::<T>().rsplit("::").next().unwrap()
1090 }
1091 }
1092
1093 impl<T> PrefixedUlid<T> {
1094 #[must_use]
1095 pub fn generate() -> Self {
1096 Self::new(Ulid::new())
1097 }
1098
1099 #[must_use]
1100 pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1101 Self::new(Ulid::from_parts(timestamp_ms, random))
1102 }
1103
1104 #[must_use]
1105 pub fn timestamp_part(&self) -> u64 {
1106 self.ulid.timestamp_ms()
1107 }
1108
1109 #[must_use]
1110 pub fn random_part(&self) -> u128 {
1112 self.ulid.random()
1113 }
1114 }
1115
1116 #[derive(Debug, thiserror::Error)]
1117 pub enum PrefixedUlidParseError {
1118 #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1119 WrongPrefix { input: String, expected: String },
1120 #[error("cannot parse ULID suffix from `{input}`")]
1121 CannotParseUlid { input: String },
1122 }
1123
1124 mod impls {
1125 use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1126 use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1127
1128 impl<T> FromStr for PrefixedUlid<T> {
1129 type Err = PrefixedUlidParseError;
1130
1131 fn from_str(input: &str) -> Result<Self, Self::Err> {
1132 let prefix = Self::prefix();
1133 let mut input_chars = input.chars();
1134 for exp in prefix.chars() {
1135 if input_chars.next() != Some(exp) {
1136 return Err(PrefixedUlidParseError::WrongPrefix {
1137 input: input.to_string(),
1138 expected: format!("{prefix}_"),
1139 });
1140 }
1141 }
1142 if input_chars.next() != Some('_') {
1143 return Err(PrefixedUlidParseError::WrongPrefix {
1144 input: input.to_string(),
1145 expected: format!("{prefix}_"),
1146 });
1147 }
1148 let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1149 return Err(PrefixedUlidParseError::CannotParseUlid {
1150 input: input.to_string(),
1151 });
1152 };
1153 Ok(Self {
1154 ulid,
1155 phantom_data: PhantomData,
1156 })
1157 }
1158 }
1159
1160 impl<T> Debug for PrefixedUlid<T> {
1161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1162 Display::fmt(&self, f)
1163 }
1164 }
1165
1166 impl<T> Hash for PrefixedUlid<T> {
1167 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1168 Self::prefix().hash(state);
1169 self.ulid.hash(state);
1170 self.phantom_data.hash(state);
1171 }
1172 }
1173
1174 impl<T> PartialEq for PrefixedUlid<T> {
1175 fn eq(&self, other: &Self) -> bool {
1176 self.ulid == other.ulid
1177 }
1178 }
1179
1180 impl<T> Eq for PrefixedUlid<T> {}
1181
1182 impl<T> PartialOrd for PrefixedUlid<T> {
1183 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1184 Some(self.cmp(other))
1185 }
1186 }
1187
1188 impl<T> Ord for PrefixedUlid<T> {
1189 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1190 self.ulid.cmp(&other.ulid)
1191 }
1192 }
1193 }
1194
1195 pub mod prefix {
1196 pub struct E;
1197 pub struct Exr;
1198 pub struct Run;
1199 pub struct Delay;
1200 }
1201
1202 pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1203 pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1204 pub type RunId = PrefixedUlid<prefix::Run>;
1205 pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; #[cfg(any(test, feature = "test"))]
1208 impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1209 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1210 Ok(Self::new(ulid::Ulid::from_parts(
1211 u.arbitrary()?,
1212 u.arbitrary()?,
1213 )))
1214 }
1215 }
1216
1217 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1218 pub enum ExecutionId {
1219 TopLevel(ExecutionIdTopLevel),
1220 Derived(ExecutionIdDerived),
1221 }
1222
1223 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1224 pub struct ExecutionIdDerived {
1225 top_level: ExecutionIdTopLevel,
1226 infix: Arc<str>,
1227 idx: u64,
1228 }
1229 impl ExecutionIdDerived {
1230 #[must_use]
1231 pub fn get_incremented(&self) -> Self {
1232 self.get_incremented_by(1)
1233 }
1234 #[must_use]
1235 pub fn get_incremented_by(&self, count: u64) -> Self {
1236 ExecutionIdDerived {
1237 top_level: self.top_level,
1238 infix: self.infix.clone(),
1239 idx: self.idx + count,
1240 }
1241 }
1242 #[must_use]
1243 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1244 let ExecutionIdDerived {
1245 top_level,
1246 infix,
1247 idx,
1248 } = self;
1249 let infix = Arc::from(format!(
1250 "{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1251 ));
1252 ExecutionIdDerived {
1253 top_level: *top_level,
1254 infix,
1255 idx: EXECUTION_ID_START_IDX,
1256 }
1257 }
1258 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1259 let ExecutionIdDerived {
1260 top_level,
1261 infix,
1262 idx,
1263 } = self;
1264 write!(
1265 f,
1266 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}"
1267 )
1268 }
1269
1270 pub fn split_to_parts(
1274 &self,
1275 ) -> Result<(ExecutionId, JoinSetId), ExecutionIdDerivedSplitError> {
1276 if let Some((old_infix_and_index, join_set_id)) =
1277 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1278 {
1279 let join_set_id = JoinSetId::from_str(join_set_id)?;
1280 let Some((old_infix, old_idx)) =
1281 old_infix_and_index.rsplit_once(EXECUTION_ID_JOIN_SET_INFIX)
1282 else {
1283 return Err(ExecutionIdDerivedSplitError::CannotFindJoinSetDelimiter);
1284 };
1285 let parent = ExecutionIdDerived {
1286 top_level: self.top_level,
1287 infix: Arc::from(old_infix),
1288 idx: old_idx
1289 .parse()
1290 .map_err(ExecutionIdDerivedSplitError::CannotParseOldIndex)?,
1291 };
1292 Ok((ExecutionId::Derived(parent), join_set_id))
1293 } else {
1294 Ok((
1296 ExecutionId::TopLevel(self.top_level),
1297 JoinSetId::from_str(&self.infix)?,
1298 ))
1299 }
1300 }
1301 }
1302
1303 #[derive(Debug, thiserror::Error)]
1304 pub enum ExecutionIdDerivedSplitError {
1305 #[error(transparent)]
1306 JoinSetIdParseError(#[from] JoinSetIdParseError),
1307 #[error("cannot parse index of parent execution - {0}")]
1308 CannotParseOldIndex(ParseIntError),
1309 #[error("cannot find join set delimiter")]
1310 CannotFindJoinSetDelimiter,
1311 }
1312
1313 impl Debug for ExecutionIdDerived {
1314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1315 self.display_or_debug(f)
1316 }
1317 }
1318 impl Display for ExecutionIdDerived {
1319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1320 self.display_or_debug(f)
1321 }
1322 }
1323 impl FromStr for ExecutionIdDerived {
1324 type Err = DerivedIdParseError;
1325
1326 fn from_str(input: &str) -> Result<Self, Self::Err> {
1327 let (top_level, infix, idx) = derived_from_str(input)?;
1328 Ok(ExecutionIdDerived {
1329 top_level,
1330 infix,
1331 idx,
1332 })
1333 }
1334 }
1335
1336 fn derived_from_str<T: 'static>(
1337 input: &str,
1338 ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1339 if let Some((prefix, suffix)) = input.split_once(EXECUTION_ID_INFIX) {
1340 let top_level = PrefixedUlid::from_str(prefix)
1341 .map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1342 let Some((infix, idx)) = suffix.rsplit_once(EXECUTION_ID_JOIN_SET_INFIX) else {
1343 return Err(DerivedIdParseError::SecondDelimiterNotFound);
1344 };
1345 let infix = Arc::from(infix);
1346 let idx = u64::from_str(idx).map_err(DerivedIdParseError::ParseIndexError)?;
1347 Ok((top_level, infix, idx))
1348 } else {
1349 Err(DerivedIdParseError::FirstDelimiterNotFound)
1350 }
1351 }
1352
1353 #[cfg(any(test, feature = "test"))]
1354 impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1355 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1356 let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1357 let join_set_id = JoinSetId::arbitrary(u)?;
1358 Ok(top_level.next_level(&join_set_id))
1359 }
1360 }
1361
1362 #[derive(Debug, thiserror::Error)]
1363 pub enum DerivedIdParseError {
1364 #[error(transparent)]
1365 PrefixedUlidParseError(PrefixedUlidParseError),
1366 #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1367 FirstDelimiterNotFound,
1368 #[error("cannot parse derived id - delimiter `{EXECUTION_ID_JOIN_SET_INFIX}` not found")]
1369 SecondDelimiterNotFound,
1370 #[error(
1371 "cannot parse derived id - suffix after `{EXECUTION_ID_JOIN_SET_INFIX}` must be a number"
1372 )]
1373 ParseIndexError(ParseIntError),
1374 }
1375
1376 impl ExecutionId {
1377 #[must_use]
1378 pub fn generate() -> Self {
1379 ExecutionId::TopLevel(PrefixedUlid::generate())
1380 }
1381
1382 #[must_use]
1383 pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1384 match &self {
1385 ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1386 ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1387 }
1388 }
1389
1390 #[must_use]
1391 pub fn is_top_level(&self) -> bool {
1392 matches!(self, ExecutionId::TopLevel(_))
1393 }
1394
1395 #[must_use]
1396 pub fn random_seed(&self) -> u64 {
1397 let mut hasher = fxhash::FxHasher::default();
1398 #[expect(clippy::cast_possible_truncation)]
1402 let random_part = self.get_top_level().random_part() as u64;
1403 hasher.write_u64(random_part);
1404 hasher.write_u64(self.get_top_level().timestamp_part());
1405 if let ExecutionId::Derived(ExecutionIdDerived {
1406 top_level: _,
1407 infix,
1408 idx,
1409 }) = self
1410 {
1411 hasher.write(infix.as_bytes());
1413 hasher.write_u64(*idx);
1414 }
1415 hasher.finish()
1416 }
1417
1418 #[must_use]
1419 pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1420 ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1421 }
1422
1423 #[must_use]
1424 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1425 match &self {
1426 ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1427 top_level: *top_level,
1428 infix: Arc::from(join_set_id.to_string()),
1429 idx: EXECUTION_ID_START_IDX,
1430 },
1431 ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1432 }
1433 }
1434
1435 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1436 match &self {
1437 ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1438 ExecutionId::Derived(derived) => Display::fmt(derived, f),
1439 }
1440 }
1441 }
1442
1443 const EXECUTION_ID_INFIX: char = '.';
1444 const EXECUTION_ID_JOIN_SET_INFIX: char = '_';
1445 const EXECUTION_ID_START_IDX: u64 = 1;
1446 pub const JOIN_SET_START_IDX: u64 = 1;
1447 const DELAY_ID_START_IDX: u64 = 1;
1448
1449 #[derive(Debug, thiserror::Error)]
1450 pub enum ExecutionIdParseError {
1451 #[error(transparent)]
1452 PrefixedUlidParseError(#[from] PrefixedUlidParseError),
1453 #[error(
1454 "cannot parse derived execution id - first delimiter `{EXECUTION_ID_INFIX}` not found"
1455 )]
1456 FirstDelimiterNotFound,
1457 #[error(
1458 "cannot parse derived execution id - second delimiter `{EXECUTION_ID_INFIX}` not found"
1459 )]
1460 SecondDelimiterNotFound,
1461 #[error("cannot parse derived execution id - last suffix must be a number")]
1462 ParseIndexError(#[from] ParseIntError),
1463 }
1464
1465 impl FromStr for ExecutionId {
1466 type Err = ExecutionIdParseError;
1467
1468 fn from_str(input: &str) -> Result<Self, Self::Err> {
1469 if input.contains(EXECUTION_ID_INFIX) {
1470 ExecutionIdDerived::from_str(input)
1471 .map(ExecutionId::Derived)
1472 .map_err(|err| match err {
1473 DerivedIdParseError::FirstDelimiterNotFound => {
1474 unreachable!("first delimiter checked")
1475 }
1476 DerivedIdParseError::SecondDelimiterNotFound => {
1477 ExecutionIdParseError::SecondDelimiterNotFound
1478 }
1479 DerivedIdParseError::PrefixedUlidParseError(err) => {
1480 ExecutionIdParseError::PrefixedUlidParseError(err)
1481 }
1482 DerivedIdParseError::ParseIndexError(err) => {
1483 ExecutionIdParseError::ParseIndexError(err)
1484 }
1485 })
1486 } else {
1487 Ok(ExecutionId::TopLevel(PrefixedUlid::from_str(input)?))
1488 }
1489 }
1490 }
1491
1492 #[derive(Debug, thiserror::Error)]
1493 pub enum ExecutionIdStructuralParseError {
1494 #[error(transparent)]
1495 ExecutionIdParseError(#[from] ExecutionIdParseError),
1496 #[error("execution-id must be a record with `id` field of type string")]
1497 TypeError,
1498 }
1499
1500 impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1501 type Error = ExecutionIdStructuralParseError;
1502
1503 fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1504 if let wasmtime::component::Val::Record(key_vals) = execution_id
1505 && key_vals.len() == 1
1506 && let Some((key, execution_id)) = key_vals.first()
1507 && key == "id"
1508 && let wasmtime::component::Val::String(execution_id) = execution_id
1509 {
1510 ExecutionId::from_str(execution_id)
1511 .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1512 } else {
1513 Err(ExecutionIdStructuralParseError::TypeError)
1514 }
1515 }
1516 }
1517
1518 impl Debug for ExecutionId {
1519 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1520 self.display_or_debug(f)
1521 }
1522 }
1523
1524 impl Display for ExecutionId {
1525 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1526 self.display_or_debug(f)
1527 }
1528 }
1529
1530 #[cfg(any(test, feature = "test"))]
1531 impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1532 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1533 Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1534 }
1535 }
1536
1537 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1539 pub struct DelayId {
1540 top_level: DelayIdTopLevel,
1541 infix: Arc<str>,
1542 idx: u64,
1543 }
1544 impl DelayId {
1545 #[must_use]
1546 pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1547 Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1548 }
1549
1550 #[must_use]
1551 pub fn new_with_index(
1552 execution_id: &ExecutionId,
1553 join_set_id: &JoinSetId,
1554 idx: u64,
1555 ) -> DelayId {
1556 let ExecutionIdDerived {
1557 top_level: PrefixedUlid { ulid, .. },
1558 infix,
1559 idx: _,
1560 } = execution_id.next_level(join_set_id);
1561 let top_level = DelayIdTopLevel::new(ulid);
1562 DelayId {
1563 top_level,
1564 infix,
1565 idx,
1566 }
1567 }
1568
1569 #[must_use]
1570 pub fn get_incremented(&self) -> Self {
1571 Self {
1572 top_level: self.top_level,
1573 infix: self.infix.clone(),
1574 idx: self.idx + 1,
1575 }
1576 }
1577
1578 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1579 let DelayId {
1580 top_level,
1581 infix,
1582 idx,
1583 } = self;
1584 write!(
1585 f,
1586 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}"
1587 )
1588 }
1589 }
1590
1591 pub mod delay_impl {
1592 use super::{DelayId, DerivedIdParseError, derived_from_str};
1593 use std::{
1594 fmt::{Debug, Display},
1595 str::FromStr,
1596 };
1597
1598 impl Debug for DelayId {
1599 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1600 self.display_or_debug(f)
1601 }
1602 }
1603
1604 impl Display for DelayId {
1605 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1606 self.display_or_debug(f)
1607 }
1608 }
1609
1610 impl FromStr for DelayId {
1611 type Err = DerivedIdParseError;
1612
1613 fn from_str(input: &str) -> Result<Self, Self::Err> {
1614 let (top_level, infix, idx) = derived_from_str(input)?;
1615 Ok(DelayId {
1616 top_level,
1617 infix,
1618 idx,
1619 })
1620 }
1621 }
1622
1623 #[cfg(any(test, feature = "test"))]
1624 impl<'a> arbitrary::Arbitrary<'a> for DelayId {
1625 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1626 use super::{ExecutionId, JoinSetId};
1627 let execution_id = ExecutionId::arbitrary(u)?;
1628 let mut join_set_id = JoinSetId::arbitrary(u)?;
1629 join_set_id.kind = crate::JoinSetKind::OneOff;
1630 Ok(DelayId::new(&execution_id, &join_set_id))
1631 }
1632 }
1633 }
1634}
1635
1636#[derive(
1637 Debug,
1638 Clone,
1639 PartialEq,
1640 Eq,
1641 Hash,
1642 derive_more::Display,
1643 serde_with::SerializeDisplay,
1644 serde_with::DeserializeFromStr,
1645)]
1646#[non_exhaustive] #[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1648pub struct JoinSetId {
1649 pub kind: JoinSetKind,
1650 pub name: StrVariant,
1651}
1652
1653#[derive(
1654 Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, Default,
1655)]
1656#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1657#[serde(rename_all = "snake_case")]
1658pub enum ClosingStrategy {
1659 #[default]
1662 Complete,
1663}
1664
1665impl JoinSetId {
1666 pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1667 Ok(Self {
1668 kind,
1669 name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1670 })
1671 }
1672}
1673
1674pub const CHARSET_ALPHANUMERIC: &str =
1675 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1676
1677#[derive(
1678 Debug,
1679 Clone,
1680 Copy,
1681 PartialEq,
1682 Eq,
1683 Hash,
1684 derive_more::Display,
1685 Serialize,
1686 Deserialize,
1687 strum::EnumIter,
1688)]
1689#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1690#[display("{}", self.as_code())]
1691pub enum JoinSetKind {
1692 OneOff,
1693 Named,
1694 Generated,
1695}
1696impl JoinSetKind {
1697 fn as_code(&self) -> &'static str {
1698 match self {
1699 JoinSetKind::OneOff => "o",
1700 JoinSetKind::Named => "n",
1701 JoinSetKind::Generated => "g",
1702 }
1703 }
1704}
1705impl FromStr for JoinSetKind {
1706 type Err = &'static str;
1707 fn from_str(s: &str) -> Result<Self, Self::Err> {
1708 use strum::IntoEnumIterator;
1709 Self::iter()
1710 .find(|variant| s == variant.as_code())
1711 .ok_or("unknown join set kind")
1712 }
1713}
1714
1715pub const JOIN_SET_ID_INFIX: char = ':';
1716const CHARSET_EXTRA_JSON_SET: &str = "_-/";
1717
1718impl FromStr for JoinSetId {
1719 type Err = JoinSetIdParseError;
1720
1721 fn from_str(input: &str) -> Result<Self, Self::Err> {
1722 let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1723 return Err(JoinSetIdParseError::WrongParts);
1724 };
1725 let kind = kind
1726 .parse()
1727 .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1728 Ok(JoinSetId::new(kind, StrVariant::from(name.to_string()))?)
1729 }
1730}
1731
1732#[derive(Debug, thiserror::Error)]
1733pub enum JoinSetIdParseError {
1734 #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1735 WrongParts,
1736 #[error("cannot parse join set id's execution id - {0}")]
1737 ExecutionIdParseError(#[from] ExecutionIdParseError),
1738 #[error("cannot parse join set kind - {0}")]
1739 JoinSetKindParseError(&'static str),
1740 #[error("cannot parse join set id - {0}")]
1741 InvalidName(#[from] InvalidNameError<JoinSetId>),
1742}
1743
1744#[cfg(any(test, feature = "test"))]
1745const CHARSET_JOIN_SET_NAME: &str =
1746 const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1747#[cfg(any(test, feature = "test"))]
1748impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
1749 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1750 let name: String = {
1751 let length_inclusive = u.int_in_range(0..=10).unwrap();
1752 (0..=length_inclusive)
1753 .map(|_| {
1754 let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1755 CHARSET_JOIN_SET_NAME
1756 .chars()
1757 .nth(idx)
1758 .expect("idx is < charset.len()")
1759 })
1760 .collect()
1761 };
1762
1763 Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
1764 }
1765}
1766
1767#[derive(
1768 Debug,
1769 Clone,
1770 Copy,
1771 strum::Display,
1772 PartialEq,
1773 Eq,
1774 strum::EnumString,
1775 Hash,
1776 serde_with::SerializeDisplay,
1777 serde_with::DeserializeFromStr,
1778)]
1779#[strum(serialize_all = "snake_case")]
1780pub enum ComponentType {
1781 ActivityWasm,
1782 ActivityStub,
1783 Workflow,
1784 WebhookEndpoint,
1785}
1786
1787#[derive(
1788 derive_more::Debug,
1789 Clone,
1790 PartialEq,
1791 Eq,
1792 Hash,
1793 serde_with::SerializeDisplay,
1794 serde_with::DeserializeFromStr,
1795 derive_more::Display,
1796)]
1797#[display("{component_type}:{name}")]
1798#[debug("{}", self)]
1799#[non_exhaustive] pub struct ComponentId {
1801 pub component_type: ComponentType,
1802 pub name: StrVariant,
1803}
1804impl ComponentId {
1805 pub fn new(
1806 component_type: ComponentType,
1807 name: StrVariant,
1808 ) -> Result<Self, InvalidNameError<Self>> {
1809 Ok(Self {
1810 component_type,
1811 name: check_name(name, "_")?,
1812 })
1813 }
1814
1815 #[must_use]
1816 pub const fn dummy_activity() -> Self {
1817 Self {
1818 component_type: ComponentType::ActivityWasm,
1819 name: StrVariant::empty(),
1820 }
1821 }
1822
1823 #[must_use]
1824 pub const fn dummy_workflow() -> ComponentId {
1825 ComponentId {
1826 component_type: ComponentType::Workflow,
1827 name: StrVariant::empty(),
1828 }
1829 }
1830}
1831
1832pub fn check_name<T>(
1833 name: StrVariant,
1834 special: &'static str,
1835) -> Result<StrVariant, InvalidNameError<T>> {
1836 if let Some(invalid) = name
1837 .as_ref()
1838 .chars()
1839 .find(|c| !c.is_ascii_alphanumeric() && !special.contains(*c))
1840 {
1841 Err(InvalidNameError::<T> {
1842 invalid,
1843 name: name.as_ref().to_string(),
1844 special,
1845 phantom_data: PhantomData,
1846 })
1847 } else {
1848 Ok(name)
1849 }
1850}
1851#[derive(Debug, thiserror::Error)]
1852#[error(
1853 "name of {} `{name}` contains invalid character `{invalid}`, must only contain alphanumeric characters and following characters {special}",
1854 std::any::type_name::<T>().rsplit("::").next().unwrap()
1855)]
1856pub struct InvalidNameError<T> {
1857 invalid: char,
1858 name: String,
1859 special: &'static str,
1860 phantom_data: PhantomData<T>,
1861}
1862
1863#[derive(Debug, thiserror::Error)]
1864pub enum ConfigIdParseError {
1865 #[error("cannot parse ComponentConfigHash - delimiter ':' not found")]
1866 DelimiterNotFound,
1867 #[error("cannot parse prefix of ComponentConfigHash - {0}")]
1868 ComponentTypeParseError(#[from] strum::ParseError),
1869}
1870
1871impl FromStr for ComponentId {
1872 type Err = ConfigIdParseError;
1873
1874 fn from_str(input: &str) -> Result<Self, Self::Err> {
1875 let (component_type, name) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1876 let component_type = component_type.parse()?;
1877 Ok(Self {
1878 component_type,
1879 name: StrVariant::from(name.to_string()),
1880 })
1881 }
1882}
1883
1884#[derive(
1885 Debug,
1886 Clone,
1887 Copy,
1888 strum::Display,
1889 strum::EnumString,
1890 PartialEq,
1891 Eq,
1892 Hash,
1893 serde_with::SerializeDisplay,
1894 serde_with::DeserializeFromStr,
1895)]
1896#[strum(serialize_all = "snake_case")]
1897pub enum HashType {
1898 Sha256,
1899}
1900
1901#[derive(
1902 Debug,
1903 Clone,
1904 derive_more::Display,
1905 derive_more::FromStr,
1906 derive_more::Deref,
1907 PartialEq,
1908 Eq,
1909 Hash,
1910 serde_with::SerializeDisplay,
1911 serde_with::DeserializeFromStr,
1912)]
1913pub struct ContentDigest(pub Digest);
1914#[cfg(any(test, feature = "test"))]
1915pub const CONTENT_DIGEST_DUMMY: ContentDigest = ContentDigest(Digest {
1916 hash_type: HashType::Sha256,
1917 hash_base16: StrVariant::empty(),
1918});
1919
1920impl ContentDigest {
1921 #[must_use]
1922 pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1923 Self(Digest::new(hash_type, hash_base16))
1924 }
1925}
1926
1927#[derive(
1928 Debug,
1929 Clone,
1930 derive_more::Display,
1931 PartialEq,
1932 Eq,
1933 Hash,
1934 serde_with::SerializeDisplay,
1935 serde_with::DeserializeFromStr,
1936)]
1937#[display("{hash_type}:{hash_base16}")]
1938pub struct Digest {
1939 hash_type: HashType,
1940 hash_base16: StrVariant,
1941}
1942impl Digest {
1943 #[must_use]
1944 pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1945 Self {
1946 hash_type,
1947 hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1948 }
1949 }
1950
1951 #[must_use]
1952 pub fn hash_type(&self) -> HashType {
1953 self.hash_type
1954 }
1955
1956 #[must_use]
1957 pub fn digest_base16(&self) -> &str {
1958 &self.hash_base16
1959 }
1960}
1961
1962#[derive(Debug, thiserror::Error)]
1963pub enum DigestParseErrror {
1964 #[error("cannot parse ContentDigest - delimiter ':' not found")]
1965 DelimiterNotFound,
1966 #[error("cannot parse ContentDigest - invalid prefix `{hash_type}`")]
1967 TypeParseError { hash_type: String },
1968 #[error("cannot parse ContentDigest - invalid suffix length, expected 64 hex digits, got {0}")]
1969 SuffixLength(usize),
1970 #[error("cannot parse ContentDigest - suffix must be hex-encoded, got invalid character `{0}`")]
1971 SuffixInvalid(char),
1972}
1973
1974impl FromStr for Digest {
1975 type Err = DigestParseErrror;
1976
1977 fn from_str(input: &str) -> Result<Self, Self::Err> {
1978 let (hash_type, hash_base16) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1979 let hash_type =
1980 HashType::from_str(hash_type).map_err(|_err| Self::Err::TypeParseError {
1981 hash_type: hash_type.to_string(),
1982 })?;
1983 if hash_base16.len() != 64 {
1984 return Err(Self::Err::SuffixLength(hash_base16.len()));
1985 }
1986 if let Some(invalid) = hash_base16.chars().find(|c| !c.is_ascii_hexdigit()) {
1987 return Err(Self::Err::SuffixInvalid(invalid));
1988 }
1989 Ok(Self {
1990 hash_type,
1991 hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1992 })
1993 }
1994}
1995
1996const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
1997#[derive(
1998 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1999)]
2000pub enum ReturnType {
2001 Extendable(ReturnTypeExtendable), NonExtendable(ReturnTypeNonExtendable), }
2004impl ReturnType {
2005 #[must_use]
2013 pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
2014 if let TypeWrapper::Result { ok, err: None } = type_wrapper {
2015 return ReturnType::Extendable(ReturnTypeExtendable {
2016 type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
2017 wit_type,
2018 });
2019 } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
2020 if let TypeWrapper::String = err.as_ref() {
2021 return ReturnType::Extendable(ReturnTypeExtendable {
2022 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
2023 wit_type,
2024 });
2025 } else if let TypeWrapper::Variant(fields) = err.as_ref()
2026 && let Some(None) = fields.get(EXECUTION_FAILED_STRING_OR_VARIANT)
2027 {
2028 return ReturnType::Extendable(ReturnTypeExtendable {
2029 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
2030 wit_type,
2031 });
2032 }
2033 return ReturnType::NonExtendable(ReturnTypeNonExtendable {
2034 type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
2035 wit_type,
2036 });
2037 }
2038 ReturnType::NonExtendable(ReturnTypeNonExtendable {
2039 type_wrapper: type_wrapper.clone(),
2040 wit_type,
2041 })
2042 }
2043
2044 #[must_use]
2045 pub fn wit_type(&self) -> &str {
2046 match self {
2047 ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
2048 ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
2049 }
2050 }
2051
2052 #[must_use]
2053 pub fn type_wrapper(&self) -> TypeWrapper {
2054 match self {
2055 ReturnType::Extendable(compatible) => {
2056 TypeWrapper::from(compatible.type_wrapper_tl.clone())
2057 }
2058 ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
2059 }
2060 }
2061}
2062
2063#[derive(
2064 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2065)]
2066#[display("{wit_type}")]
2067pub struct ReturnTypeNonExtendable {
2068 pub type_wrapper: TypeWrapper,
2069 pub wit_type: StrVariant,
2070}
2071
2072#[derive(
2073 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2074)]
2075#[display("{wit_type}")]
2076pub struct ReturnTypeExtendable {
2077 pub type_wrapper_tl: TypeWrapperTopLevel,
2078 pub wit_type: StrVariant,
2079}
2080
2081#[cfg(any(test, feature = "test"))]
2082pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2083 type_wrapper_tl: TypeWrapperTopLevel {
2084 ok: None,
2085 err: None,
2086 },
2087 wit_type: StrVariant::Static("result"),
2088});
2089
2090#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2091#[derive_where::derive_where(PartialEq)]
2092#[display("{name}: {wit_type}")]
2093pub struct ParameterType {
2094 pub type_wrapper: TypeWrapper,
2095 #[derive_where(skip)]
2096 pub name: StrVariant,
2098 pub wit_type: StrVariant,
2099}
2100
2101#[derive(
2102 Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2103)]
2104pub struct ParameterTypes(pub Vec<ParameterType>);
2105
2106impl Debug for ParameterTypes {
2107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2108 write!(f, "(")?;
2109 let mut iter = self.0.iter().peekable();
2110 while let Some(p) = iter.next() {
2111 write!(f, "{p:?}")?;
2112 if iter.peek().is_some() {
2113 write!(f, ", ")?;
2114 }
2115 }
2116 write!(f, ")")
2117 }
2118}
2119
2120impl Display for ParameterTypes {
2121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2122 write!(f, "(")?;
2123 let mut iter = self.0.iter().peekable();
2124 while let Some(p) = iter.next() {
2125 write!(f, "{p}")?;
2126 if iter.peek().is_some() {
2127 write!(f, ", ")?;
2128 }
2129 }
2130 write!(f, ")")
2131 }
2132}
2133
2134#[derive(Debug, Clone)]
2135pub struct PackageIfcFns {
2136 pub ifc_fqn: IfcFqnName,
2137 pub extension: bool, pub fns: IndexMap<FnName, FunctionMetadata>,
2139}
2140
2141#[derive(Debug, Clone, Copy)]
2142pub struct ComponentRetryConfig {
2143 pub max_retries: u32,
2144 pub retry_exp_backoff: Duration,
2145}
2146impl ComponentRetryConfig {
2147 pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2148 max_retries: 0,
2149 retry_exp_backoff: Duration::ZERO,
2150 };
2151}
2152
2153pub trait FunctionRegistry: Send + Sync {
2155 fn get_by_exported_function(
2156 &self,
2157 ffqn: &FunctionFqn,
2158 ) -> Option<(FunctionMetadata, ComponentId, ComponentRetryConfig)>;
2159
2160 fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2163 self.get_by_exported_function(ffqn)
2164 .and_then(|(fn_meta, _, _)| {
2165 if let ReturnType::Extendable(ReturnTypeExtendable {
2166 type_wrapper_tl: type_wrapper,
2167 wit_type: _,
2168 }) = fn_meta.return_type
2169 {
2170 Some(type_wrapper)
2171 } else {
2172 None
2173 }
2174 })
2175 }
2176
2177 fn all_exports(&self) -> &[PackageIfcFns];
2178}
2179
2180#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
2181#[display("{_0:?}")]
2182pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
2183
2184impl ExecutionMetadata {
2185 const LINKED_KEY: &str = "obelisk-tracing-linked";
2186 #[must_use]
2187 pub const fn empty() -> Self {
2188 Self(None)
2190 }
2191
2192 #[must_use]
2193 pub fn from_parent_span(less_specific: &Span) -> Self {
2194 ExecutionMetadata::create(less_specific, false)
2195 }
2196
2197 #[must_use]
2198 pub fn from_linked_span(less_specific: &Span) -> Self {
2199 ExecutionMetadata::create(less_specific, true)
2200 }
2201
2202 #[must_use]
2207 #[expect(clippy::items_after_statements)]
2208 fn create(span: &Span, link_marker: bool) -> Self {
2209 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2210 let mut metadata = Self(Some(hashbrown::HashMap::default()));
2211 let mut metadata_view = ExecutionMetadataInjectorView {
2212 metadata: &mut metadata,
2213 };
2214 fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2216 opentelemetry::global::get_text_map_propagator(|propagator| {
2217 propagator.inject_context(&s.context(), metadata_view);
2218 });
2219 }
2220 inject(&Span::current(), &mut metadata_view);
2221 if metadata_view.is_empty() {
2222 inject(span, &mut metadata_view);
2224 }
2225 if link_marker {
2226 metadata_view.set(Self::LINKED_KEY, String::new());
2227 }
2228 metadata
2229 }
2230
2231 pub fn enrich(&self, span: &Span) {
2232 use opentelemetry::trace::TraceContextExt as _;
2233 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2234
2235 let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2236 let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2237 propagator.extract(&metadata_view)
2238 });
2239 if metadata_view.get(Self::LINKED_KEY).is_some() {
2240 let linked_span_context = otel_context.span().span_context().clone();
2241 span.add_link(linked_span_context);
2242 } else {
2243 span.set_parent(otel_context);
2244 }
2245 }
2246}
2247
2248struct ExecutionMetadataInjectorView<'a> {
2249 metadata: &'a mut ExecutionMetadata,
2250}
2251
2252impl ExecutionMetadataInjectorView<'_> {
2253 fn is_empty(&self) -> bool {
2254 self.metadata
2255 .0
2256 .as_ref()
2257 .is_some_and(hashbrown::HashMap::is_empty)
2258 }
2259}
2260
2261impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2262 fn set(&mut self, key: &str, value: String) {
2263 let key = format!("tracing:{key}");
2264 let map = if let Some(map) = self.metadata.0.as_mut() {
2265 map
2266 } else {
2267 self.metadata.0 = Some(hashbrown::HashMap::new());
2268 assert_matches!(&mut self.metadata.0, Some(map) => map)
2269 };
2270 map.insert(key, value);
2271 }
2272}
2273
2274struct ExecutionMetadataExtractorView<'a> {
2275 metadata: &'a ExecutionMetadata,
2276}
2277
2278impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2279 fn get(&self, key: &str) -> Option<&str> {
2280 self.metadata
2281 .0
2282 .as_ref()
2283 .and_then(|map| map.get(&format!("tracing:{key}")))
2284 .map(std::string::String::as_str)
2285 }
2286
2287 fn keys(&self) -> Vec<&str> {
2288 match &self.metadata.0.as_ref() {
2289 Some(map) => map
2290 .keys()
2291 .filter_map(|key| key.strip_prefix("tracing:"))
2292 .collect(),
2293 None => vec![],
2294 }
2295 }
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300
2301 use rstest::rstest;
2302
2303 use crate::{
2304 ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2305 };
2306 use std::{
2307 hash::{DefaultHasher, Hash, Hasher},
2308 str::FromStr,
2309 sync::Arc,
2310 };
2311
2312 #[test]
2313 fn ulid_parsing() {
2314 let generated = ExecutorId::generate();
2315 let str = generated.to_string();
2316 let parsed = str.parse().unwrap();
2317 assert_eq!(generated, parsed);
2318 }
2319
2320 #[test]
2321 fn execution_id_parsing_top_level() {
2322 let generated = ExecutionId::generate();
2323 let str = generated.to_string();
2324 let parsed = str.parse().unwrap();
2325 assert_eq!(generated, parsed);
2326 }
2327
2328 #[test]
2329 fn execution_id_with_one_level_should_parse() {
2330 let top_level = ExecutionId::generate();
2331 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2332 let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2333 let ser = first_child.to_string();
2334 assert_eq!(format!("{top_level}.n:name_1"), ser);
2335 let parsed = ExecutionId::from_str(&ser).unwrap();
2336 assert_eq!(first_child, parsed);
2337 }
2338
2339 #[test]
2340 fn execution_id_increment_twice() {
2341 let top_level = ExecutionId::generate();
2342 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2343 let first_child = top_level.next_level(&join_set_id);
2344 let second_child = ExecutionId::Derived(first_child.get_incremented());
2345 let ser = second_child.to_string();
2346 assert_eq!(format!("{top_level}.n:name_2"), ser);
2347 let parsed = ExecutionId::from_str(&ser).unwrap();
2348 assert_eq!(second_child, parsed);
2349 }
2350
2351 #[test]
2352 fn execution_id_next_level_twice() {
2353 let top_level = ExecutionId::generate();
2354 let join_set_id_outer =
2355 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2356 let join_set_id_inner =
2357 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2358 let execution_id = ExecutionId::Derived(
2359 top_level
2360 .next_level(&join_set_id_outer)
2361 .get_incremented()
2362 .next_level(&join_set_id_inner)
2363 .get_incremented(),
2364 );
2365 let ser = execution_id.to_string();
2366 assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2367 let parsed = ExecutionId::from_str(&ser).unwrap();
2368 assert_eq!(execution_id, parsed);
2369 }
2370
2371 #[test]
2372 fn execution_id_split_first_level() {
2373 let top_level = ExecutionId::generate();
2374 let join_set_id =
2375 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2376 let execution_id = top_level.next_level(&join_set_id);
2377 let (actual_top_level, actual_join_set) = execution_id.split_to_parts().unwrap();
2378 assert_eq!(top_level, actual_top_level);
2379 assert_eq!(join_set_id, actual_join_set);
2380 }
2381
2382 #[rstest]
2383 fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2384 let top_level = ExecutionId::generate();
2385 let join_set_id_outer =
2386 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2387 let first_level = top_level
2388 .next_level(&join_set_id_outer)
2389 .get_incremented_by(outer_idx);
2390
2391 let join_set_id_inner =
2392 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2393 let second_level = first_level.next_level(&join_set_id_inner);
2394
2395 let (actual_first_level, actual_join_set) = second_level.split_to_parts().unwrap();
2396 assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2397 assert_eq!(join_set_id_inner, actual_join_set);
2398 }
2399
2400 #[test]
2401 fn execution_id_hash_should_be_stable() {
2402 let parent = ExecutionId::from_parts(1, 2);
2403 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2404 let sibling_1 = parent.next_level(&join_set_id);
2405 let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2406 let sibling_1 = ExecutionId::Derived(sibling_1);
2407 let join_set_id_inner =
2408 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2409 let child =
2410 ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2411 let parent = parent.random_seed();
2412 let sibling_1 = sibling_1.random_seed();
2413 let sibling_2 = sibling_2.random_seed();
2414 let child = child.random_seed();
2415 let vec = vec![parent, sibling_1, sibling_2, child];
2416 insta::assert_debug_snapshot!(vec);
2417 let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2419 assert_eq!(4, set.len());
2420 }
2421
2422 #[test]
2423 fn hash_of_str_variants_should_be_equal() {
2424 let input = "foo";
2425 let left = StrVariant::Arc(Arc::from(input));
2426 let right = StrVariant::Static(input);
2427 assert_eq!(left, right);
2428 let mut left_hasher = DefaultHasher::new();
2429 left.hash(&mut left_hasher);
2430 let mut right_hasher = DefaultHasher::new();
2431 right.hash(&mut right_hasher);
2432 let left_hasher = left_hasher.finish();
2433 let right_hasher = right_hasher.finish();
2434 println!("left: {left_hasher:x}, right: {right_hasher:x}");
2435 assert_eq!(left_hasher, right_hasher);
2436 }
2437
2438 #[test]
2439 fn ffqn_from_tuple_with_version_should_work() {
2440 let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2441 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2442 }
2443
2444 #[test]
2445 fn ffqn_from_str_with_version_should_work() {
2446 let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2447 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2448 }
2449
2450 #[tokio::test]
2451 async fn join_set_serde_should_be_consistent() {
2452 use crate::{JoinSetId, JoinSetKind};
2453 use strum::IntoEnumIterator;
2454 for kind in JoinSetKind::iter() {
2455 let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2456 let ser = serde_json::to_string(&join_set_id).unwrap();
2457 let deser = serde_json::from_str(&ser).unwrap();
2458 assert_eq!(join_set_id, deser);
2459 }
2460 }
2461}