obeli_sk_concepts/
lib.rs

1pub mod component_id;
2mod error_conversions;
3#[cfg(feature = "postgres")]
4mod postgres_ext;
5#[cfg(feature = "rusqlite")]
6mod rusqlite_ext;
7pub mod storage;
8pub mod time;
9
10pub use crate::component_id::{
11    ComponentId, ComponentType, ContentDigest, InvalidNameError, check_name,
12};
13use ::serde::{Deserialize, Serialize};
14use assert_matches::assert_matches;
15pub use indexmap;
16use indexmap::IndexMap;
17use opentelemetry::propagation::{Extractor, Injector};
18pub use prefixed_ulid::ExecutionId;
19use serde_json::Value;
20use std::{
21    borrow::Borrow,
22    fmt::{Debug, Display},
23    hash::Hash,
24    marker::PhantomData,
25    ops::Deref,
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
31use tracing::{Span, error};
32use val_json::{
33    type_wrapper::{TypeConversionError, TypeWrapper},
34    wast_val::{WastVal, WastValWithType},
35    wast_val_ser::params,
36};
37use wasmtime::component::{Type, Val};
38
39pub const NAMESPACE_OBELISK: &str = "obelisk";
40const NAMESPACE_WASI: &str = "wasi";
41pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
42pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
43pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
44
45#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
46#[error("execution error: {kind}")]
47pub struct FinishedExecutionError {
48    pub kind: ExecutionFailureKind,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub reason: Option<String>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub detail: Option<String>,
53}
54impl FinishedExecutionError {
55    #[must_use]
56    pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
57        PendingStateFinishedError::ExecutionFailure(self.kind)
58    }
59}
60
61#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum ExecutionFailureKind {
64    /// Applicable to activities only, because workflows will be retried forever
65    TimedOut,
66    /// Applicable to workflows
67    NondeterminismDetected,
68    /// Applicable to WASM components
69    OutOfFuel,
70    /// Applicable to activities
71    Cancelled,
72    Uncategorized,
73}
74
75#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(rename_all = "snake_case")]
77pub enum TrapKind {
78    #[display("trap")]
79    Trap,
80    #[display("post_return_trap")]
81    PostReturnTrap,
82    #[display("out of fuel")]
83    OutOfFuel,
84    #[display("host function error")]
85    HostFunctionError,
86}
87
88#[derive(Clone, Eq, derive_more::Display)]
89pub enum StrVariant {
90    Static(&'static str),
91    Arc(Arc<str>),
92}
93
94impl StrVariant {
95    #[must_use]
96    pub const fn empty() -> StrVariant {
97        StrVariant::Static("")
98    }
99}
100
101impl From<String> for StrVariant {
102    fn from(value: String) -> Self {
103        StrVariant::Arc(Arc::from(value))
104    }
105}
106
107impl From<&'static str> for StrVariant {
108    fn from(value: &'static str) -> Self {
109        StrVariant::Static(value)
110    }
111}
112
113impl PartialEq for StrVariant {
114    fn eq(&self, other: &Self) -> bool {
115        match (self, other) {
116            (Self::Static(left), Self::Static(right)) => left == right,
117            (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
118            (Self::Arc(left), Self::Arc(right)) => left == right,
119            (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
120        }
121    }
122}
123
124impl Hash for StrVariant {
125    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
126        match self {
127            StrVariant::Static(val) => val.hash(state),
128            StrVariant::Arc(val) => {
129                let str: &str = val.deref();
130                str.hash(state);
131            }
132        }
133    }
134}
135
136impl Debug for StrVariant {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        Display::fmt(self, f)
139    }
140}
141
142impl Deref for StrVariant {
143    type Target = str;
144    fn deref(&self) -> &Self::Target {
145        match self {
146            Self::Arc(v) => v,
147            Self::Static(v) => v,
148        }
149    }
150}
151
152impl AsRef<str> for StrVariant {
153    fn as_ref(&self) -> &str {
154        match self {
155            Self::Arc(v) => v,
156            Self::Static(v) => v,
157        }
158    }
159}
160
161mod serde_strvariant {
162    use crate::StrVariant;
163    use serde::{
164        Deserialize, Deserializer, Serialize, Serializer,
165        de::{self, Visitor},
166    };
167    use std::{ops::Deref, sync::Arc};
168
169    impl Serialize for StrVariant {
170        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
171        where
172            S: Serializer,
173        {
174            serializer.serialize_str(self.deref())
175        }
176    }
177
178    impl<'de> Deserialize<'de> for StrVariant {
179        fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
180        where
181            D: Deserializer<'de>,
182        {
183            deserializer.deserialize_str(StrVariantVisitor)
184        }
185    }
186
187    struct StrVariantVisitor;
188
189    impl Visitor<'_> for StrVariantVisitor {
190        type Value = StrVariant;
191
192        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
193            formatter.write_str("a string")
194        }
195
196        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
197        where
198            E: de::Error,
199        {
200            Ok(StrVariant::Arc(Arc::from(v)))
201        }
202    }
203}
204
205#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
206#[display("{value}")]
207#[serde(transparent)]
208pub struct Name<T> {
209    pub value: StrVariant,
210    #[serde(skip)]
211    phantom_data: PhantomData<fn(T) -> T>,
212}
213
214impl<T> Name<T> {
215    #[must_use]
216    pub fn new_arc(value: Arc<str>) -> Self {
217        Self {
218            value: StrVariant::Arc(value),
219            phantom_data: PhantomData,
220        }
221    }
222
223    #[must_use]
224    pub const fn new_static(value: &'static str) -> Self {
225        Self {
226            value: StrVariant::Static(value),
227            phantom_data: PhantomData,
228        }
229    }
230}
231
232impl<T> Debug for Name<T> {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        Display::fmt(&self, f)
235    }
236}
237
238impl<T> Deref for Name<T> {
239    type Target = str;
240
241    fn deref(&self) -> &Self::Target {
242        self.value.deref()
243    }
244}
245
246impl<T> Borrow<str> for Name<T> {
247    fn borrow(&self) -> &str {
248        self.deref()
249    }
250}
251
252impl<T> From<String> for Name<T> {
253    fn from(value: String) -> Self {
254        Self::new_arc(Arc::from(value))
255    }
256}
257
258#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
259#[display("{}", self.suffix())]
260pub enum PackageExtension {
261    ObeliskExt,
262    ObeliskSchedule,
263    ObeliskStub,
264}
265impl PackageExtension {
266    fn suffix(&self) -> &'static str {
267        match self {
268            PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
269            PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
270            PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
271        }
272    }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
276#[cfg_attr(feature = "test", derive(Serialize))]
277pub struct PkgFqn {
278    pub namespace: String, // TODO: StrVariant or reference
279    pub package_name: String,
280    pub version: Option<String>,
281}
282impl PkgFqn {
283    #[must_use]
284    pub fn is_extension(&self) -> bool {
285        Self::is_package_name_ext(&self.package_name)
286    }
287
288    #[must_use]
289    pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
290        use strum::IntoEnumIterator;
291        for package_ext in PackageExtension::iter() {
292            if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
293                return Some((
294                    PkgFqn {
295                        namespace: self.namespace.clone(),
296                        package_name: package_name.to_string(),
297                        version: self.version.clone(),
298                    },
299                    package_ext,
300                ));
301            }
302        }
303        None
304    }
305
306    fn is_package_name_ext(package_name: &str) -> bool {
307        package_name.ends_with(SUFFIX_PKG_EXT)
308            || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
309            || package_name.ends_with(SUFFIX_PKG_STUB)
310    }
311}
312impl Display for PkgFqn {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        let PkgFqn {
315            namespace,
316            package_name,
317            version,
318        } = self;
319        if let Some(version) = version {
320            write!(f, "{namespace}:{package_name}@{version}")
321        } else {
322            write!(f, "{namespace}:{package_name}")
323        }
324    }
325}
326
327#[derive(Hash, Clone, PartialEq, Eq)]
328pub struct IfcFqnMarker;
329
330pub type IfcFqnName = Name<IfcFqnMarker>; // namespace:name/ifc_name OR namespace:name/ifc_name@version
331
332impl IfcFqnName {
333    #[must_use]
334    pub fn namespace(&self) -> &str {
335        self.deref().split_once(':').unwrap().0
336    }
337
338    #[must_use]
339    pub fn package_name(&self) -> &str {
340        let after_colon = self.deref().split_once(':').unwrap().1;
341        after_colon.split_once('/').unwrap().0
342    }
343
344    #[must_use]
345    pub fn version(&self) -> Option<&str> {
346        self.deref().split_once('@').map(|(_, version)| version)
347    }
348
349    #[must_use]
350    pub fn pkg_fqn_name(&self) -> PkgFqn {
351        let (namespace, rest) = self.deref().split_once(':').unwrap();
352        let (package_name, rest) = rest.split_once('/').unwrap();
353        let version = rest.split_once('@').map(|(_, version)| version);
354        PkgFqn {
355            namespace: namespace.to_string(),
356            package_name: package_name.to_string(),
357            version: version.map(std::string::ToString::to_string),
358        }
359    }
360
361    #[must_use]
362    pub fn ifc_name(&self) -> &str {
363        let after_colon = self.deref().split_once(':').unwrap().1;
364        let after_slash = after_colon.split_once('/').unwrap().1;
365        after_slash
366            .split_once('@')
367            .map_or(after_slash, |(ifc, _)| ifc)
368    }
369
370    #[must_use]
371    pub fn from_parts(
372        namespace: &str,
373        package_name: &str,
374        ifc_name: &str,
375        version: Option<&str>,
376    ) -> Self {
377        let mut str = format!("{namespace}:{package_name}/{ifc_name}");
378        if let Some(version) = version {
379            str += "@";
380            str += version;
381        }
382        Self::new_arc(Arc::from(str))
383    }
384
385    #[must_use]
386    /// Returns true if this is an `-obelisk-*` extension interface.
387    pub fn is_extension(&self) -> bool {
388        PkgFqn::is_package_name_ext(self.package_name())
389    }
390
391    #[must_use]
392    pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
393        self.package_name().strip_suffix(SUFFIX_PKG_EXT)
394    }
395
396    #[must_use]
397    pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
398        self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
399    }
400
401    #[must_use]
402    pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
403        self.package_name().strip_suffix(SUFFIX_PKG_STUB)
404    }
405
406    #[must_use]
407    pub fn is_namespace_obelisk(&self) -> bool {
408        self.namespace() == NAMESPACE_OBELISK
409    }
410
411    #[must_use]
412    pub fn is_namespace_wasi(&self) -> bool {
413        self.namespace() == NAMESPACE_WASI
414    }
415}
416
417#[derive(Hash, Clone, PartialEq, Eq)]
418pub struct FnMarker;
419
420pub type FnName = Name<FnMarker>;
421
422#[derive(
423    Hash, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
424)]
425pub struct FunctionFqn {
426    pub ifc_fqn: IfcFqnName,
427    pub function_name: FnName,
428}
429
430impl FunctionFqn {
431    #[must_use]
432    pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
433        Self {
434            ifc_fqn: Name::new_arc(ifc_fqn),
435            function_name: Name::new_arc(function_name),
436        }
437    }
438
439    #[must_use]
440    pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
441        Self {
442            ifc_fqn: Name::new_static(ifc_fqn),
443            function_name: Name::new_static(function_name),
444        }
445    }
446
447    #[must_use]
448    pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
449        Self::new_static(tuple.0, tuple.1)
450    }
451
452    pub fn try_from_tuple(
453        ifc_fqn: &str,
454        function_name: &str,
455    ) -> Result<Self, FunctionFqnParseError> {
456        if function_name.contains('.') {
457            Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
458        } else {
459            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
460        }
461    }
462}
463
464#[derive(Debug, thiserror::Error)]
465pub enum FunctionFqnParseError {
466    #[error("delimiter `.` not found")]
467    DelimiterNotFound,
468    #[error("delimiter `.` found in function name")]
469    DelimiterFoundInFunctionName,
470}
471
472impl FromStr for FunctionFqn {
473    type Err = FunctionFqnParseError;
474
475    fn from_str(s: &str) -> Result<Self, Self::Err> {
476        if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
477            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
478        } else {
479            Err(FunctionFqnParseError::DelimiterNotFound)
480        }
481    }
482}
483
484impl Display for FunctionFqn {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        write!(
487            f,
488            "{ifc_fqn}.{function_name}",
489            ifc_fqn = self.ifc_fqn,
490            function_name = self.function_name
491        )
492    }
493}
494
495impl Debug for FunctionFqn {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        Display::fmt(&self, f)
498    }
499}
500
501#[cfg(any(test, feature = "test"))]
502impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
503    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
504        let illegal = [':', '@', '.'];
505        let namespace = u.arbitrary::<String>()?.replace(illegal, "");
506        let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
507        let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
508        let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
509
510        Ok(FunctionFqn::new_arc(
511            Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
512            Arc::from(fn_name),
513        ))
514    }
515}
516
517#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
518pub struct TypeWrapperTopLevel {
519    pub ok: Option<Box<TypeWrapper>>,
520    pub err: Option<Box<TypeWrapper>>,
521}
522impl From<TypeWrapperTopLevel> for TypeWrapper {
523    fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
524        TypeWrapper::Result {
525            ok: value.ok,
526            err: value.err,
527        }
528    }
529}
530
531#[derive(Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum SupportedFunctionReturnValue {
533    Ok {
534        #[debug(skip)]
535        ok: Option<WastValWithType>,
536    },
537    Err {
538        #[debug(skip)]
539        err: Option<WastValWithType>,
540    },
541    ExecutionError(FinishedExecutionError),
542}
543impl Display for SupportedFunctionReturnValue {
544    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545        match self.as_pending_state_finished_result() {
546            PendingStateFinishedResultKind::Ok => write!(f, "execution completed successfully"),
547            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
548        }
549    }
550}
551
552pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
553    SupportedFunctionReturnValue::Ok { ok: None };
554
555#[derive(Debug, thiserror::Error)]
556pub enum ResultParsingError {
557    #[error("return value must not be empty")]
558    NoValue,
559    #[error("return value cannot be parsed, multi-value results are not supported")]
560    MultiValue,
561    #[error("return value cannot be parsed, {0}")]
562    TypeConversionError(val_json::type_wrapper::TypeConversionError),
563    #[error(transparent)]
564    ResultParsingErrorFromVal(ResultParsingErrorFromVal),
565}
566
567#[derive(Debug, thiserror::Error)]
568pub enum ResultParsingErrorFromVal {
569    #[error("return value cannot be parsed, {0}")]
570    WastValConversionError(val_json::wast_val::WastValConversionError),
571    #[error("top level type must be a result")]
572    TopLevelTypeMustBeAResult,
573    #[error("value does not type check")]
574    TypeCheckError,
575}
576
577impl SupportedFunctionReturnValue {
578    pub fn new<
579        I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
580    >(
581        mut iter: I,
582    ) -> Result<Self, ResultParsingError> {
583        if iter.len() == 0 {
584            Err(ResultParsingError::NoValue)
585        } else if iter.len() == 1 {
586            let (val, r#type) = iter.next().unwrap();
587            let r#type =
588                TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
589            Self::from_val_and_type_wrapper(val, r#type)
590                .map_err(ResultParsingError::ResultParsingErrorFromVal)
591        } else {
592            Err(ResultParsingError::MultiValue)
593        }
594    }
595
596    #[expect(clippy::result_unit_err)]
597    pub fn from_wast_val_with_type(
598        value: WastValWithType,
599    ) -> Result<SupportedFunctionReturnValue, ()> {
600        match value {
601            WastValWithType {
602                r#type: TypeWrapper::Result { ok: None, err: _ },
603                value: WastVal::Result(Ok(None)),
604            } => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
605            WastValWithType {
606                r#type:
607                    TypeWrapper::Result {
608                        ok: Some(ok),
609                        err: _,
610                    },
611                value: WastVal::Result(Ok(Some(value))),
612            } => Ok(SupportedFunctionReturnValue::Ok {
613                ok: Some(WastValWithType {
614                    r#type: *ok,
615                    value: *value,
616                }),
617            }),
618            WastValWithType {
619                r#type: TypeWrapper::Result { ok: _, err: None },
620                value: WastVal::Result(Err(None)),
621            } => Ok(SupportedFunctionReturnValue::Err { err: None }),
622            WastValWithType {
623                r#type:
624                    TypeWrapper::Result {
625                        ok: _,
626                        err: Some(err),
627                    },
628                value: WastVal::Result(Err(Some(value))),
629            } => Ok(SupportedFunctionReturnValue::Err {
630                err: Some(WastValWithType {
631                    r#type: *err,
632                    value: *value,
633                }),
634            }),
635            _ => Err(()),
636        }
637    }
638
639    pub fn from_val_and_type_wrapper(
640        value: wasmtime::component::Val,
641        ty: TypeWrapper,
642    ) -> Result<Self, ResultParsingErrorFromVal> {
643        let TypeWrapper::Result { ok, err } = ty else {
644            return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
645        };
646        let ty = TypeWrapperTopLevel { ok, err };
647        Self::from_val_and_type_wrapper_tl(value, ty)
648    }
649
650    pub fn from_val_and_type_wrapper_tl(
651        value: wasmtime::component::Val,
652        ty: TypeWrapperTopLevel,
653    ) -> Result<Self, ResultParsingErrorFromVal> {
654        let wasmtime::component::Val::Result(value) = value else {
655            return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
656        };
657
658        match (ty.ok, ty.err, value) {
659            (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
660            (Some(ok_type), _, Ok(Some(value))) => Ok(SupportedFunctionReturnValue::Ok {
661                ok: Some(WastValWithType {
662                    r#type: *ok_type,
663                    value: WastVal::try_from(*value)
664                        .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
665                }),
666            }),
667            (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err { err: None }),
668            (_, Some(err_type), Err(Some(value))) => Ok(SupportedFunctionReturnValue::Err {
669                err: Some(WastValWithType {
670                    r#type: *err_type,
671                    value: WastVal::try_from(*value)
672                        .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
673                }),
674            }),
675            _other => Err(ResultParsingErrorFromVal::TypeCheckError),
676        }
677    }
678
679    #[must_use]
680    pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
681        match self {
682            SupportedFunctionReturnValue::Ok { ok: None } => WastVal::Result(Ok(None)),
683            SupportedFunctionReturnValue::Ok { ok: Some(v) } => {
684                WastVal::Result(Ok(Some(Box::new(v.value))))
685            }
686            SupportedFunctionReturnValue::Err { err: None } => WastVal::Result(Err(None)),
687            SupportedFunctionReturnValue::Err { err: Some(v) } => {
688                WastVal::Result(Err(Some(Box::new(v.value))))
689            }
690            SupportedFunctionReturnValue::ExecutionError(_) => {
691                execution_error_to_wast_val(&get_return_type())
692            }
693        }
694    }
695
696    #[must_use]
697    pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
698        match self {
699            SupportedFunctionReturnValue::Ok { ok: _ } => PendingStateFinishedResultKind::Ok,
700            SupportedFunctionReturnValue::Err { err: _ } => {
701                PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error)
702            }
703            SupportedFunctionReturnValue::ExecutionError(err) => {
704                PendingStateFinishedResultKind::Err(err.as_pending_state_finished_error())
705            }
706        }
707    }
708}
709
710#[must_use]
711pub fn execution_error_to_wast_val(ret_type: &TypeWrapperTopLevel) -> WastVal {
712    match ret_type {
713        TypeWrapperTopLevel { ok: _, err: None } => return WastVal::Result(Err(None)),
714        TypeWrapperTopLevel {
715            ok: _,
716            err: Some(inner),
717        } => match inner.as_ref() {
718            TypeWrapper::String => {
719                return WastVal::Result(Err(Some(Box::new(WastVal::String(
720                    EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
721                )))));
722            }
723            TypeWrapper::Variant(variants) => {
724                if variants.get(EXECUTION_FAILED_STRING_OR_VARIANT) == Some(&None) {
725                    return WastVal::Result(Err(Some(Box::new(WastVal::Variant(
726                        EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
727                        None,
728                    )))));
729                }
730            }
731            _ => {}
732        },
733    }
734    unreachable!("unexpected top-level return type {ret_type:?} cannot be ReturnTypeCompatible")
735}
736
737#[derive(Debug, Clone)]
738pub struct Params(ParamsInternal);
739
740#[derive(derive_more::Debug, Clone)]
741enum ParamsInternal {
742    JsonValues(Arc<[Value]>),
743    Vals {
744        vals: Arc<[wasmtime::component::Val]>,
745        #[debug(skip)]
746        json_vals_cache: Arc<std::sync::RwLock<Option<Arc<[Value]>>>>, // Caches json values
747    },
748    Empty,
749}
750
751pub const SUFFIX_FN_SUBMIT: &str = "-submit";
752pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
753pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
754pub const SUFFIX_FN_STUB: &str = "-stub";
755pub const SUFFIX_FN_GET: &str = "-get";
756
757#[derive(
758    Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, strum::EnumIter,
759)]
760#[serde(rename_all = "snake_case")]
761pub enum FunctionExtension {
762    Submit,
763    AwaitNext,
764    Schedule,
765    Stub,
766    Get,
767}
768impl FunctionExtension {
769    #[must_use]
770    pub fn suffix(&self) -> &'static str {
771        match self {
772            FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
773            FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
774            FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
775            FunctionExtension::Stub => SUFFIX_FN_STUB,
776            FunctionExtension::Get => SUFFIX_FN_GET,
777        }
778    }
779
780    #[must_use]
781    pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
782        matches!(
783            (pkg_ext, self),
784            (
785                PackageExtension::ObeliskExt,
786                FunctionExtension::Submit | FunctionExtension::AwaitNext | FunctionExtension::Get
787            ) | (
788                PackageExtension::ObeliskSchedule,
789                FunctionExtension::Schedule
790            ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
791        )
792    }
793}
794
795#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
796pub struct FunctionMetadata {
797    pub ffqn: FunctionFqn,
798    pub parameter_types: ParameterTypes,
799    pub return_type: ReturnType,
800    pub extension: Option<FunctionExtension>,
801    /// Externally submittable: primary functions + `-schedule` extended, but no activity stubs
802    pub submittable: bool,
803}
804impl FunctionMetadata {
805    #[must_use]
806    pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
807        self.extension.map(|extension| {
808            let prefix = self
809                .ffqn
810                .function_name
811                .value
812                .strip_suffix(extension.suffix())
813                .unwrap_or_else(|| {
814                    panic!(
815                        "extension function {} must end with expected suffix {}",
816                        self.ffqn.function_name,
817                        extension.suffix()
818                    )
819                });
820            (prefix, extension)
821        })
822    }
823}
824impl Display for FunctionMetadata {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        write!(
827            f,
828            "{ffqn}: func{params} -> {return_type}",
829            ffqn = self.ffqn,
830            params = self.parameter_types,
831            return_type = self.return_type,
832        )
833    }
834}
835
836pub mod serde_params {
837    use crate::{Params, ParamsInternal};
838    use serde::de::{SeqAccess, Visitor};
839    use serde::ser::SerializeSeq;
840    use serde::{Deserialize, Serialize};
841    use serde_json::Value;
842    use std::sync::Arc;
843    use val_json::wast_val::WastVal;
844
845    impl Serialize for Params {
846        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
847        where
848            S: ::serde::Serializer,
849        {
850            let serialize_json_values = |slice: &[serde_json::Value], serializer: S| {
851                let mut seq = serializer.serialize_seq(Some(slice.len()))?;
852                for item in slice {
853                    seq.serialize_element(item)?;
854                }
855                seq.end()
856            };
857            match &self.0 {
858                ParamsInternal::Vals {
859                    vals,
860                    json_vals_cache,
861                } => {
862                    let guard = json_vals_cache.read().unwrap();
863                    if let Some(slice) = &*guard {
864                        serialize_json_values(slice, serializer)
865                    } else {
866                        drop(guard);
867                        let mut json_vals = Vec::with_capacity(vals.len());
868                        for val in vals.iter() {
869                            let value = WastVal::try_from(val.clone())
870                                .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
871                            let value = serde_json::to_value(&value)
872                                .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
873                            json_vals.push(value);
874                        }
875                        let res = serialize_json_values(&json_vals, serializer);
876                        *json_vals_cache.write().unwrap() = Some(Arc::from(json_vals));
877                        res
878                    }
879                }
880                ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
881                ParamsInternal::JsonValues(vec) => serialize_json_values(vec, serializer),
882            }
883        }
884    }
885
886    pub struct VecVisitor;
887
888    impl<'de> Visitor<'de> for VecVisitor {
889        type Value = Vec<Value>;
890
891        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
892            formatter.write_str("a sequence of `Value`")
893        }
894
895        #[inline]
896        fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
897        where
898            V: SeqAccess<'de>,
899        {
900            let mut vec = Vec::new();
901            while let Some(elem) = visitor.next_element()? {
902                vec.push(elem);
903            }
904            Ok(vec)
905        }
906    }
907
908    impl<'de> Deserialize<'de> for Params {
909        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
910        where
911            D: serde::Deserializer<'de>,
912        {
913            let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
914            if vec.is_empty() {
915                Ok(Self(ParamsInternal::Empty))
916            } else {
917                Ok(Self(ParamsInternal::JsonValues(Arc::from(vec))))
918            }
919        }
920    }
921}
922
923#[derive(Debug, thiserror::Error)]
924pub enum ParamsParsingError {
925    #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
926    ParameterTypeError {
927        idx: usize,
928        err: TypeConversionError,
929    },
930    #[error("parameters cannot be deserialized: {0}")]
931    ParamsDeserializationError(serde_json::Error),
932    #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
933    ParameterCardinalityMismatch { expected: usize, specified: usize },
934}
935
936impl ParamsParsingError {
937    #[must_use]
938    pub fn detail(&self) -> Option<String> {
939        match self {
940            ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
941            ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
942            ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
943        }
944    }
945}
946
947#[derive(Debug, thiserror::Error)]
948pub enum ParamsFromJsonError {
949    #[error("value must be a json array containing function parameters")]
950    MustBeArray,
951}
952
953impl Params {
954    #[must_use]
955    pub const fn empty() -> Self {
956        Self(ParamsInternal::Empty)
957    }
958
959    #[must_use]
960    pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
961        if vals.is_empty() {
962            Self::empty()
963        } else {
964            Self(ParamsInternal::Vals {
965                vals,
966                json_vals_cache: Arc::default(),
967            })
968        }
969    }
970
971    #[cfg(any(test, feature = "test"))]
972    #[must_use]
973    pub fn from_json_values_test(vec: Vec<Value>) -> Self {
974        if vec.is_empty() {
975            Self::empty()
976        } else {
977            Self(ParamsInternal::JsonValues(Arc::from(vec)))
978        }
979    }
980
981    #[must_use]
982    pub fn from_json_values(values: Arc<[Value]>) -> Self {
983        if values.is_empty() {
984            Self::empty()
985        } else {
986            Self(ParamsInternal::JsonValues(values))
987        }
988    }
989
990    pub fn typecheck<'a>(
991        &self,
992        param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
993    ) -> Result<(), ParamsParsingError> {
994        if param_types.len() != self.len() {
995            return Err(ParamsParsingError::ParameterCardinalityMismatch {
996                expected: param_types.len(),
997                specified: self.len(),
998            });
999        }
1000        match &self.0 {
1001            ParamsInternal::Vals { .. } /* already typechecked */ | ParamsInternal::Empty => {}
1002            ParamsInternal::JsonValues(params) => {
1003                params::deserialize_values(params, param_types)
1004                .map_err(ParamsParsingError::ParamsDeserializationError)?;
1005            }
1006        }
1007        Ok(())
1008    }
1009
1010    pub fn as_vals<'a>(
1011        &self,
1012        param_types: impl ExactSizeIterator<Item = (&'a str, Type)>,
1013    ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1014        if param_types.len() != self.len() {
1015            return Err(ParamsParsingError::ParameterCardinalityMismatch {
1016                expected: param_types.len(),
1017                specified: self.len(),
1018            });
1019        }
1020        match &self.0 {
1021            ParamsInternal::JsonValues(json_vec) => {
1022                let param_types = param_types
1023                    .enumerate()
1024                    .map(|(idx, (_param_name, ty))| {
1025                        TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1026                    })
1027                    .collect::<Result<Vec<_>, _>>()
1028                    .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1029                Ok(params::deserialize_values(json_vec, param_types.iter())
1030                    .map_err(ParamsParsingError::ParamsDeserializationError)?
1031                    .into_iter()
1032                    .map(Val::from)
1033                    .collect())
1034            }
1035            ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1036            ParamsInternal::Empty => Ok(Arc::from([])),
1037        }
1038    }
1039
1040    #[must_use]
1041    pub fn len(&self) -> usize {
1042        match &self.0 {
1043            ParamsInternal::JsonValues(vec) => vec.len(),
1044            ParamsInternal::Vals { vals, .. } => vals.len(),
1045            ParamsInternal::Empty => 0,
1046        }
1047    }
1048
1049    #[must_use]
1050    pub fn is_empty(&self) -> bool {
1051        self.len() == 0
1052    }
1053}
1054
1055impl PartialEq for Params {
1056    fn eq(&self, other: &Self) -> bool {
1057        if self.is_empty() && other.is_empty() {
1058            return true;
1059        }
1060        if self.len() != other.len() {
1061            return false;
1062        }
1063
1064        match (&self.0, &other.0) {
1065            (ParamsInternal::JsonValues(l), ParamsInternal::JsonValues(r)) => l == r,
1066
1067            (
1068                ParamsInternal::Vals {
1069                    vals: l,
1070                    json_vals_cache: _,
1071                },
1072                ParamsInternal::Vals {
1073                    vals: r,
1074                    json_vals_cache: _,
1075                },
1076            ) => l == r,
1077
1078            (
1079                ParamsInternal::JsonValues(json_vals),
1080                ParamsInternal::Vals {
1081                    vals,
1082                    json_vals_cache,
1083                },
1084            )
1085            | (
1086                ParamsInternal::Vals {
1087                    vals,
1088                    json_vals_cache,
1089                },
1090                ParamsInternal::JsonValues(json_vals),
1091            ) => {
1092                let Ok(vec) = to_json(vals) else { return false };
1093                let equals = *json_vals == vec;
1094                *json_vals_cache.write().unwrap() = Some(vec);
1095                equals
1096            }
1097
1098            (ParamsInternal::Empty, _) | (_, ParamsInternal::Empty) => {
1099                unreachable!("zero length and different lengths handled earlier")
1100            }
1101        }
1102    }
1103}
1104impl Eq for Params {}
1105
1106/// Convert wasmtime's Vals to serde's Values
1107fn to_json(vals: &[wasmtime::component::Val]) -> Result<Arc<[serde_json::Value]>, ()> {
1108    let mut vec = Vec::with_capacity(vals.len());
1109    for val in vals {
1110        let value = match WastVal::try_from(val.clone()) {
1111            Ok(ok) => ok,
1112            Err(err) => {
1113                error!("cannot compare Params, cannot convert to WastVal: {err:?}");
1114                return Err(());
1115            }
1116        };
1117        let value = match serde_json::to_value(&value) {
1118            Ok(ok) => ok,
1119            Err(err) => {
1120                error!("cannot compare Params, cannot convert to JSON: {err:?}");
1121                return Err(());
1122            }
1123        };
1124        vec.push(value);
1125    }
1126    Ok(Arc::from(vec))
1127}
1128
1129impl Display for Params {
1130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1131        let render_json_vals =
1132            |f: &mut std::fmt::Formatter<'_>, json_vals: &[Value]| -> std::fmt::Result {
1133                for (i, json_value) in json_vals.iter().enumerate() {
1134                    if i > 0 {
1135                        write!(f, ", ")?;
1136                    }
1137                    write!(f, "{json_value}")?;
1138                }
1139                Ok(())
1140            };
1141        write!(f, "[")?;
1142        match &self.0 {
1143            ParamsInternal::Empty => {}
1144            ParamsInternal::JsonValues(json_vals) => {
1145                render_json_vals(f, json_vals)?;
1146            }
1147            ParamsInternal::Vals {
1148                vals,
1149                json_vals_cache,
1150            } => {
1151                let guard = json_vals_cache.read().unwrap();
1152                if let Some(json_vals) = &*guard {
1153                    render_json_vals(f, json_vals)?;
1154                } else {
1155                    drop(guard);
1156                    let Ok(json_vals) = to_json(vals) else {
1157                        return write!(f, "<serialization error>]"); // note trailing ]
1158                    };
1159                    render_json_vals(f, &json_vals)?;
1160                    *json_vals_cache.write().unwrap() = Some(json_vals);
1161                }
1162            }
1163        }
1164        write!(f, "]")
1165    }
1166}
1167
1168pub mod prefixed_ulid {
1169    use crate::{
1170        EXECUTION_ID_INFIX, EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX, JoinSetId,
1171        JoinSetIdParseError,
1172    };
1173    use serde_with::{DeserializeFromStr, SerializeDisplay};
1174    use std::{
1175        fmt::{Debug, Display},
1176        hash::Hasher,
1177        marker::PhantomData,
1178        num::ParseIntError,
1179        str::FromStr,
1180        sync::Arc,
1181    };
1182    use ulid::Ulid;
1183
1184    #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
1185    #[derive_where::derive_where(Clone, Copy)]
1186    #[display("{}_{ulid}", Self::prefix())]
1187    pub struct PrefixedUlid<T: 'static> {
1188        ulid: Ulid,
1189        phantom_data: PhantomData<fn(T) -> T>,
1190    }
1191
1192    impl<T> PrefixedUlid<T> {
1193        const fn new(ulid: Ulid) -> Self {
1194            Self {
1195                ulid,
1196                phantom_data: PhantomData,
1197            }
1198        }
1199
1200        fn prefix() -> &'static str {
1201            std::any::type_name::<T>().rsplit("::").next().unwrap()
1202        }
1203    }
1204
1205    impl<T> PrefixedUlid<T> {
1206        #[must_use]
1207        pub fn generate() -> Self {
1208            Self::new(Ulid::new())
1209        }
1210
1211        #[must_use]
1212        pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1213            Self::new(Ulid::from_parts(timestamp_ms, random))
1214        }
1215
1216        #[must_use]
1217        pub fn timestamp_part(&self) -> u64 {
1218            self.ulid.timestamp_ms()
1219        }
1220
1221        #[must_use]
1222        /// Fills only the lower 80 bits of the returned 128-bit value.
1223        pub fn random_part(&self) -> u128 {
1224            self.ulid.random()
1225        }
1226    }
1227
1228    #[derive(Debug, thiserror::Error)]
1229    pub enum PrefixedUlidParseError {
1230        #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1231        WrongPrefix { input: String, expected: String },
1232        #[error("cannot parse ULID suffix from `{input}`")]
1233        CannotParseUlid { input: String },
1234    }
1235
1236    mod impls {
1237        use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1238        use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1239
1240        impl<T> FromStr for PrefixedUlid<T> {
1241            type Err = PrefixedUlidParseError;
1242
1243            fn from_str(input: &str) -> Result<Self, Self::Err> {
1244                let prefix = Self::prefix();
1245                let mut input_chars = input.chars();
1246                for exp in prefix.chars() {
1247                    if input_chars.next() != Some(exp) {
1248                        return Err(PrefixedUlidParseError::WrongPrefix {
1249                            input: input.to_string(),
1250                            expected: format!("{prefix}_"),
1251                        });
1252                    }
1253                }
1254                if input_chars.next() != Some('_') {
1255                    return Err(PrefixedUlidParseError::WrongPrefix {
1256                        input: input.to_string(),
1257                        expected: format!("{prefix}_"),
1258                    });
1259                }
1260                let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1261                    return Err(PrefixedUlidParseError::CannotParseUlid {
1262                        input: input.to_string(),
1263                    });
1264                };
1265                Ok(Self {
1266                    ulid,
1267                    phantom_data: PhantomData,
1268                })
1269            }
1270        }
1271
1272        impl<T> Debug for PrefixedUlid<T> {
1273            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1274                Display::fmt(&self, f)
1275            }
1276        }
1277
1278        impl<T> Hash for PrefixedUlid<T> {
1279            fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1280                Self::prefix().hash(state);
1281                self.ulid.hash(state);
1282                self.phantom_data.hash(state);
1283            }
1284        }
1285
1286        impl<T> PartialEq for PrefixedUlid<T> {
1287            fn eq(&self, other: &Self) -> bool {
1288                self.ulid == other.ulid
1289            }
1290        }
1291
1292        impl<T> Eq for PrefixedUlid<T> {}
1293
1294        impl<T> PartialOrd for PrefixedUlid<T> {
1295            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1296                Some(self.cmp(other))
1297            }
1298        }
1299
1300        impl<T> Ord for PrefixedUlid<T> {
1301            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1302                self.ulid.cmp(&other.ulid)
1303            }
1304        }
1305    }
1306
1307    pub mod prefix {
1308        pub struct E;
1309        pub struct Exr;
1310        pub struct Run;
1311        pub struct Delay;
1312    }
1313
1314    pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1315    pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1316    pub type RunId = PrefixedUlid<prefix::Run>;
1317    pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; // Never used directly, tracking top level ExecutionId
1318
1319    #[cfg(any(test, feature = "test"))]
1320    impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1321        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1322            Ok(Self::new(ulid::Ulid::from_parts(
1323                u.arbitrary()?,
1324                u.arbitrary()?,
1325            )))
1326        }
1327    }
1328
1329    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1330    pub enum ExecutionId {
1331        TopLevel(ExecutionIdTopLevel),
1332        Derived(ExecutionIdDerived),
1333    }
1334
1335    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1336    pub struct ExecutionIdDerived {
1337        top_level: ExecutionIdTopLevel,
1338        infix: Arc<str>,
1339        idx: u64,
1340    }
1341    impl ExecutionIdDerived {
1342        #[must_use]
1343        pub fn get_incremented(&self) -> Self {
1344            self.get_incremented_by(1)
1345        }
1346        #[must_use]
1347        pub fn get_incremented_by(&self, count: u64) -> Self {
1348            ExecutionIdDerived {
1349                top_level: self.top_level,
1350                infix: self.infix.clone(),
1351                idx: self.idx + count,
1352            }
1353        }
1354        #[must_use]
1355        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1356            let ExecutionIdDerived {
1357                top_level,
1358                infix,
1359                idx,
1360            } = self;
1361            let infix = Arc::from(format!(
1362                "{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1363            ));
1364            ExecutionIdDerived {
1365                top_level: *top_level,
1366                infix,
1367                idx: EXECUTION_ID_START_IDX,
1368            }
1369        }
1370        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1371            let ExecutionIdDerived {
1372                top_level,
1373                infix,
1374                idx,
1375            } = self;
1376            write!(
1377                f,
1378                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1379            )
1380        }
1381
1382        #[must_use]
1383        pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1384            self.split_to_parts_res().expect("verified in from_str")
1385        }
1386
1387        fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1388            // Two cases:
1389            // A. infix contains a `.` => infix must be split into old_infix _ old_idx . JoinSetId
1390            // B. else => child of a top level, will be split into the top level, the whole infix must be JoinSetId.
1391            if let Some((old_infix_and_index, join_set_id)) =
1392                self.infix.rsplit_once(EXECUTION_ID_INFIX)
1393            {
1394                let join_set_id = JoinSetId::from_str(join_set_id)
1395                    .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1396                let Some((old_infix, old_idx)) =
1397                    old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1398                else {
1399                    return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1400                };
1401                let parent = ExecutionIdDerived {
1402                    top_level: self.top_level,
1403                    infix: Arc::from(old_infix),
1404                    idx: old_idx
1405                        .parse()
1406                        .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1407                };
1408                Ok((ExecutionId::Derived(parent), join_set_id))
1409            } else {
1410                Ok((
1411                    ExecutionId::TopLevel(self.top_level),
1412                    JoinSetId::from_str(&self.infix)
1413                        .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1414                ))
1415            }
1416        }
1417    }
1418
1419    #[derive(Debug, thiserror::Error)]
1420    pub enum DerivedIdSplitError {
1421        #[error(transparent)]
1422        JoinSetIdParseError(JoinSetIdParseError),
1423        #[error("cannot parse index of parent execution - {0}")]
1424        CannotParseOldIndex(ParseIntError),
1425        #[error("cannot find join set delimiter")]
1426        CannotFindJoinSetDelimiter,
1427    }
1428
1429    impl Debug for ExecutionIdDerived {
1430        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1431            self.display_or_debug(f)
1432        }
1433    }
1434    impl Display for ExecutionIdDerived {
1435        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1436            self.display_or_debug(f)
1437        }
1438    }
1439    impl FromStr for ExecutionIdDerived {
1440        type Err = DerivedIdParseError;
1441
1442        fn from_str(input: &str) -> Result<Self, Self::Err> {
1443            let (top_level, infix, idx) = derived_from_str(input)?;
1444            let id = ExecutionIdDerived {
1445                top_level,
1446                infix,
1447                idx,
1448            };
1449            Ok(id)
1450        }
1451    }
1452
1453    fn derived_from_str<T: 'static>(
1454        input: &str,
1455    ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1456        // Isolate the Root ID (Prefix) from the Hierarchical Path (Suffix)
1457        let (prefix, full_suffix) = input
1458            .split_once(EXECUTION_ID_INFIX)
1459            .ok_or(DerivedIdParseError::FirstDelimiterNotFound)?;
1460
1461        // Parse the Root ULID
1462        let top_level =
1463            PrefixedUlid::from_str(prefix).map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1464
1465        // Iterate through every segment to ensure the full path is valid.
1466        let mut last_idx = None;
1467        for segment in full_suffix.split(EXECUTION_ID_INFIX) {
1468            if segment.is_empty() {
1469                return Err(DerivedIdParseError::EmptySegment);
1470            }
1471            // Split the segment into JoinSetId and Index
1472            let (join_set_str, idx_str) = segment
1473                .split_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1474                .ok_or(DerivedIdParseError::SecondDelimiterNotFound)?;
1475            JoinSetId::from_str(join_set_str).map_err(DerivedIdParseError::JoinSetIdParseError)?;
1476            let idx = u64::from_str(idx_str).map_err(DerivedIdParseError::ParseIndexError)?;
1477            last_idx = Some((idx, idx_str));
1478        }
1479        let (idx, idx_str) = last_idx.expect("split must have returned at least one element");
1480        let infix = full_suffix
1481            .strip_suffix(idx_str)
1482            .expect("must have ended with index");
1483        let infix = infix
1484            .strip_suffix(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1485            .expect("must have ended with `_index`");
1486        Ok((top_level, Arc::from(infix), idx))
1487    }
1488
1489    #[cfg(any(test, feature = "test"))]
1490    impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1491        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1492            let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1493            let join_set_id = JoinSetId::arbitrary(u)?;
1494            Ok(top_level.next_level(&join_set_id))
1495        }
1496    }
1497
1498    #[derive(Debug, thiserror::Error)]
1499    pub enum DerivedIdParseError {
1500        #[error(transparent)]
1501        PrefixedUlidParseError(PrefixedUlidParseError),
1502        #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1503        FirstDelimiterNotFound,
1504        #[error(
1505            "cannot parse derived id - delimiter `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` not found"
1506        )]
1507        SecondDelimiterNotFound,
1508        #[error(
1509            "cannot parse derived id - suffix after `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` must be a number"
1510        )]
1511        ParseIndexError(ParseIntError),
1512        #[error(transparent)]
1513        DerivedIdSplitError(DerivedIdSplitError),
1514        #[error(transparent)]
1515        JoinSetIdParseError(JoinSetIdParseError),
1516        #[error("empty segment")]
1517        EmptySegment,
1518    }
1519
1520    impl ExecutionId {
1521        #[must_use]
1522        pub fn generate() -> Self {
1523            ExecutionId::TopLevel(PrefixedUlid::generate())
1524        }
1525
1526        #[must_use]
1527        pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1528            match &self {
1529                ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1530                ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1531            }
1532        }
1533
1534        #[must_use]
1535        pub fn is_top_level(&self) -> bool {
1536            matches!(self, ExecutionId::TopLevel(_))
1537        }
1538
1539        #[must_use]
1540        pub fn random_seed(&self) -> u64 {
1541            let mut hasher = fxhash::FxHasher::default();
1542            // `Self::random_part` uses only the lower 80 bits of a 128-bit value.
1543            // Truncate to 64 bits, since including the remaining 16 bits
1544            // would not increase the entropy of the 64-bit output.
1545            #[expect(clippy::cast_possible_truncation)]
1546            let random_part = self.get_top_level().random_part() as u64;
1547            hasher.write_u64(random_part);
1548            hasher.write_u64(self.get_top_level().timestamp_part());
1549            if let ExecutionId::Derived(ExecutionIdDerived {
1550                top_level: _,
1551                infix,
1552                idx,
1553            }) = self
1554            {
1555                // Each derived execution ID should return different seed.
1556                hasher.write(infix.as_bytes());
1557                hasher.write_u64(*idx);
1558            }
1559            hasher.finish()
1560        }
1561
1562        #[must_use]
1563        pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1564            ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1565        }
1566
1567        #[must_use]
1568        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1569            match &self {
1570                ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1571                    top_level: *top_level,
1572                    infix: Arc::from(join_set_id.to_string()),
1573                    idx: EXECUTION_ID_START_IDX,
1574                },
1575                ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1576            }
1577        }
1578
1579        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1580            match &self {
1581                ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1582                ExecutionId::Derived(derived) => Display::fmt(derived, f),
1583            }
1584        }
1585    }
1586
1587    const EXECUTION_ID_START_IDX: u64 = 1;
1588    pub const JOIN_SET_START_IDX: u64 = 1;
1589    const DELAY_ID_START_IDX: u64 = 1;
1590
1591    #[derive(Debug, thiserror::Error)]
1592    pub enum ExecutionIdParseError {
1593        #[error(transparent)]
1594        PrefixedUlidParseError(PrefixedUlidParseError),
1595        #[error(transparent)]
1596        DerivedIdParseError(DerivedIdParseError),
1597    }
1598
1599    impl FromStr for ExecutionId {
1600        type Err = ExecutionIdParseError;
1601
1602        fn from_str(input: &str) -> Result<Self, Self::Err> {
1603            if input.contains(EXECUTION_ID_INFIX) {
1604                ExecutionIdDerived::from_str(input)
1605                    .map(ExecutionId::Derived)
1606                    .map_err(ExecutionIdParseError::DerivedIdParseError)
1607            } else {
1608                Ok(ExecutionId::TopLevel(
1609                    PrefixedUlid::from_str(input)
1610                        .map_err(ExecutionIdParseError::PrefixedUlidParseError)?,
1611                ))
1612            }
1613        }
1614    }
1615
1616    #[derive(Debug, thiserror::Error)]
1617    pub enum ExecutionIdStructuralParseError {
1618        #[error(transparent)]
1619        ExecutionIdParseError(#[from] ExecutionIdParseError),
1620        #[error("execution-id must be a record with `id` field of type string")]
1621        TypeError,
1622    }
1623
1624    impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1625        type Error = ExecutionIdStructuralParseError;
1626
1627        fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1628            if let wasmtime::component::Val::Record(key_vals) = execution_id
1629                && key_vals.len() == 1
1630                && let Some((key, execution_id)) = key_vals.first()
1631                && key == "id"
1632                && let wasmtime::component::Val::String(execution_id) = execution_id
1633            {
1634                ExecutionId::from_str(execution_id)
1635                    .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1636            } else {
1637                Err(ExecutionIdStructuralParseError::TypeError)
1638            }
1639        }
1640    }
1641
1642    impl Debug for ExecutionId {
1643        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1644            self.display_or_debug(f)
1645        }
1646    }
1647
1648    impl Display for ExecutionId {
1649        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1650            self.display_or_debug(f)
1651        }
1652    }
1653
1654    #[cfg(any(test, feature = "test"))]
1655    impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1656        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1657            Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1658        }
1659    }
1660
1661    /// Mirrors [`ExecutionId`], with different prefix and `idx` for tracking each delay within the join set.
1662    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1663    pub struct DelayId {
1664        top_level: DelayIdTopLevel,
1665        infix: Arc<str>,
1666        idx: u64,
1667    }
1668    impl DelayId {
1669        #[must_use]
1670        pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1671            Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1672        }
1673
1674        #[must_use]
1675        pub fn new_with_index(
1676            execution_id: &ExecutionId,
1677            join_set_id: &JoinSetId,
1678            idx: u64,
1679        ) -> DelayId {
1680            let ExecutionIdDerived {
1681                top_level: PrefixedUlid { ulid, .. },
1682                infix,
1683                idx: _,
1684            } = execution_id.next_level(join_set_id);
1685            let top_level = DelayIdTopLevel::new(ulid);
1686            DelayId {
1687                top_level,
1688                infix,
1689                idx,
1690            }
1691        }
1692
1693        #[must_use]
1694        pub fn get_incremented(&self) -> Self {
1695            Self {
1696                top_level: self.top_level,
1697                infix: self.infix.clone(),
1698                idx: self.idx + 1,
1699            }
1700        }
1701
1702        #[must_use]
1703        pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1704            self.split_to_parts_res().expect("verified in from_str")
1705        }
1706
1707        fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1708            // Two cases:
1709            // A. infix contains a `.` => infix must be split into old_infix _ old_idx . JoinSetId
1710            // B. else => child of a top level, will be split into the top level, the whole infix must be JoinSetId.
1711            if let Some((old_infix_and_index, join_set_id)) =
1712                self.infix.rsplit_once(EXECUTION_ID_INFIX)
1713            {
1714                let join_set_id = JoinSetId::from_str(join_set_id)
1715                    .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1716                let Some((old_infix, old_idx)) =
1717                    old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1718                else {
1719                    return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1720                };
1721                let parent = ExecutionIdDerived {
1722                    top_level: ExecutionIdTopLevel::new(self.top_level.ulid),
1723                    infix: Arc::from(old_infix),
1724                    idx: old_idx
1725                        .parse()
1726                        .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1727                };
1728                Ok((ExecutionId::Derived(parent), join_set_id))
1729            } else {
1730                Ok((
1731                    ExecutionId::TopLevel(ExecutionIdTopLevel::new(self.top_level.ulid)),
1732                    JoinSetId::from_str(&self.infix)
1733                        .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1734                ))
1735            }
1736        }
1737
1738        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1739            let DelayId {
1740                top_level,
1741                infix,
1742                idx,
1743            } = self;
1744            write!(
1745                f,
1746                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1747            )
1748        }
1749    }
1750
1751    pub mod delay_impl {
1752        use super::{DelayId, DerivedIdParseError, derived_from_str};
1753        use std::{
1754            fmt::{Debug, Display},
1755            str::FromStr,
1756        };
1757
1758        impl Debug for DelayId {
1759            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1760                self.display_or_debug(f)
1761            }
1762        }
1763
1764        impl Display for DelayId {
1765            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1766                self.display_or_debug(f)
1767            }
1768        }
1769
1770        impl FromStr for DelayId {
1771            type Err = DerivedIdParseError;
1772
1773            fn from_str(input: &str) -> Result<Self, Self::Err> {
1774                let (top_level, infix, idx) = derived_from_str(input)?;
1775                let id = DelayId {
1776                    top_level,
1777                    infix,
1778                    idx,
1779                };
1780                Ok(id)
1781            }
1782        }
1783
1784        #[cfg(any(test, feature = "test"))]
1785        impl<'a> arbitrary::Arbitrary<'a> for DelayId {
1786            fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1787                use super::{ExecutionId, JoinSetId};
1788                let execution_id = ExecutionId::arbitrary(u)?;
1789                let mut join_set_id = JoinSetId::arbitrary(u)?;
1790                join_set_id.kind = crate::JoinSetKind::OneOff;
1791                Ok(DelayId::new(&execution_id, &join_set_id))
1792            }
1793        }
1794    }
1795}
1796
1797#[derive(
1798    Debug,
1799    Clone,
1800    PartialEq,
1801    Eq,
1802    Hash,
1803    derive_more::Display,
1804    serde_with::SerializeDisplay,
1805    serde_with::DeserializeFromStr,
1806)]
1807#[non_exhaustive] // force using the constructor as much as possible due to validation
1808#[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1809pub struct JoinSetId {
1810    pub kind: JoinSetKind,
1811    pub name: StrVariant,
1812}
1813
1814impl JoinSetId {
1815    pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1816        Ok(Self {
1817            kind,
1818            name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1819        })
1820    }
1821}
1822
1823pub const CHARSET_ALPHANUMERIC: &str =
1824    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1825
1826#[derive(
1827    Debug,
1828    Clone,
1829    Copy,
1830    PartialEq,
1831    Eq,
1832    Hash,
1833    derive_more::Display,
1834    Serialize,
1835    Deserialize,
1836    strum::EnumIter,
1837)]
1838#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1839#[display("{}", self.as_code())]
1840pub enum JoinSetKind {
1841    OneOff,
1842    Named,
1843    Generated,
1844}
1845impl JoinSetKind {
1846    fn as_code(&self) -> &'static str {
1847        match self {
1848            JoinSetKind::OneOff => "o",
1849            JoinSetKind::Named => "n",
1850            JoinSetKind::Generated => "g",
1851        }
1852    }
1853}
1854impl FromStr for JoinSetKind {
1855    type Err = &'static str;
1856    fn from_str(s: &str) -> Result<Self, Self::Err> {
1857        use strum::IntoEnumIterator;
1858        Self::iter()
1859            .find(|variant| s == variant.as_code())
1860            .ok_or("unknown join set kind")
1861    }
1862}
1863
1864pub(crate) const EXECUTION_ID_INFIX: char = '.';
1865pub(crate) const EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX: char = '_';
1866
1867const JOIN_SET_ID_INFIX: char = ':';
1868const CHARSET_EXTRA_JSON_SET: &str = "-/";
1869
1870impl FromStr for JoinSetId {
1871    type Err = JoinSetIdParseError;
1872
1873    fn from_str(input: &str) -> Result<Self, Self::Err> {
1874        let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1875            return Err(JoinSetIdParseError::WrongParts);
1876        };
1877        let kind = kind
1878            .parse()
1879            .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1880        JoinSetId::new(kind, StrVariant::from(name.to_string()))
1881            .map_err(JoinSetIdParseError::InvalidName)
1882    }
1883}
1884
1885#[derive(Debug, thiserror::Error)]
1886pub enum JoinSetIdParseError {
1887    #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1888    WrongParts,
1889    #[error("cannot parse join set kind - {0}")]
1890    JoinSetKindParseError(&'static str),
1891    #[error("cannot parse join set id - {0}")]
1892    InvalidName(InvalidNameError<JoinSetId>),
1893}
1894
1895#[cfg(any(test, feature = "test"))]
1896const CHARSET_JOIN_SET_NAME: &str =
1897    const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1898#[cfg(any(test, feature = "test"))]
1899impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
1900    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1901        let name: String = {
1902            let length_inclusive = u.int_in_range(0..=10).unwrap();
1903            (0..=length_inclusive)
1904                .map(|_| {
1905                    let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1906                    CHARSET_JOIN_SET_NAME
1907                        .chars()
1908                        .nth(idx)
1909                        .expect("idx is < charset.len()")
1910                })
1911                .collect()
1912        };
1913
1914        Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
1915    }
1916}
1917
1918const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
1919#[derive(
1920    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1921)]
1922pub enum ReturnType {
1923    Extendable(ReturnTypeExtendable), // Execution failures can be converted to this return type, e.g. result<_, string>
1924    NonExtendable(ReturnTypeNonExtendable), // e.g. -submit returns ExecutionId
1925}
1926impl ReturnType {
1927    /// Evaluate whether the return type is one of supported types:
1928    /// * `result`
1929    /// * `result<T>`
1930    /// * `result<_, string>`
1931    /// * `result<T, string>`
1932    /// * `result<T, E>` where T can be `_` and E is a `variant` containing `execution-failed`
1933    ///   variant with no associated value.
1934    #[must_use]
1935    pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
1936        if let TypeWrapper::Result { ok, err: None } = type_wrapper {
1937            return ReturnType::Extendable(ReturnTypeExtendable {
1938                type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
1939                wit_type,
1940            });
1941        } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
1942            if let TypeWrapper::String = err.as_ref() {
1943                return ReturnType::Extendable(ReturnTypeExtendable {
1944                    type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1945                    wit_type,
1946                });
1947            } else if let TypeWrapper::Variant(fields) = err.as_ref()
1948                && let Some(None) = fields.get(EXECUTION_FAILED_STRING_OR_VARIANT)
1949            {
1950                return ReturnType::Extendable(ReturnTypeExtendable {
1951                    type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1952                    wit_type,
1953                });
1954            }
1955            return ReturnType::NonExtendable(ReturnTypeNonExtendable {
1956                type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
1957                wit_type,
1958            });
1959        }
1960        ReturnType::NonExtendable(ReturnTypeNonExtendable {
1961            type_wrapper: type_wrapper.clone(),
1962            wit_type,
1963        })
1964    }
1965
1966    #[must_use]
1967    pub fn wit_type(&self) -> &str {
1968        match self {
1969            ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
1970            ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
1971        }
1972    }
1973
1974    #[must_use]
1975    pub fn type_wrapper(&self) -> TypeWrapper {
1976        match self {
1977            ReturnType::Extendable(compatible) => {
1978                TypeWrapper::from(compatible.type_wrapper_tl.clone())
1979            }
1980            ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
1981        }
1982    }
1983}
1984
1985#[derive(
1986    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1987)]
1988#[display("{wit_type}")]
1989pub struct ReturnTypeNonExtendable {
1990    pub type_wrapper: TypeWrapper,
1991    pub wit_type: StrVariant,
1992}
1993
1994#[derive(
1995    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1996)]
1997#[display("{wit_type}")]
1998pub struct ReturnTypeExtendable {
1999    pub type_wrapper_tl: TypeWrapperTopLevel,
2000    pub wit_type: StrVariant,
2001}
2002
2003#[cfg(any(test, feature = "test"))]
2004pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2005    type_wrapper_tl: TypeWrapperTopLevel {
2006        ok: None,
2007        err: None,
2008    },
2009    wit_type: StrVariant::Static("result"),
2010});
2011
2012#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2013#[derive_where::derive_where(PartialEq)]
2014#[display("{name}: {wit_type}")]
2015pub struct ParameterType {
2016    pub type_wrapper: TypeWrapper,
2017    #[derive_where(skip)]
2018    // Names are read from how a component names the parameter and thus might differ between export and import.
2019    pub name: StrVariant,
2020    pub wit_type: StrVariant,
2021}
2022
2023#[derive(
2024    Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2025)]
2026pub struct ParameterTypes(pub Vec<ParameterType>);
2027
2028impl Debug for ParameterTypes {
2029    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2030        write!(f, "(")?;
2031        let mut iter = self.0.iter().peekable();
2032        while let Some(p) = iter.next() {
2033            write!(f, "{p:?}")?;
2034            if iter.peek().is_some() {
2035                write!(f, ", ")?;
2036            }
2037        }
2038        write!(f, ")")
2039    }
2040}
2041
2042impl Display for ParameterTypes {
2043    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2044        write!(f, "(")?;
2045        let mut iter = self.0.iter().peekable();
2046        while let Some(p) = iter.next() {
2047            write!(f, "{p}")?;
2048            if iter.peek().is_some() {
2049                write!(f, ", ")?;
2050            }
2051        }
2052        write!(f, ")")
2053    }
2054}
2055
2056#[derive(Debug, Clone)]
2057pub struct PackageIfcFns {
2058    pub ifc_fqn: IfcFqnName,
2059    pub extension: bool, // one of `-obelisk-ext`, `-obelisk-schedule`, `-obelisk-stub`
2060    pub fns: IndexMap<FnName, FunctionMetadata>,
2061}
2062
2063#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2064pub struct ComponentRetryConfig {
2065    pub max_retries: Option<u32>,
2066    pub retry_exp_backoff: Duration,
2067}
2068impl ComponentRetryConfig {
2069    pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2070        max_retries: Some(0),
2071        retry_exp_backoff: Duration::ZERO,
2072    };
2073
2074    #[cfg(feature = "test")]
2075    pub const WORKFLOW_TEST: ComponentRetryConfig = ComponentRetryConfig {
2076        max_retries: None,
2077        retry_exp_backoff: Duration::ZERO,
2078    };
2079}
2080
2081/// Implementation must not return `-obelisk-*` extended function, nor functions from `obelisk` namespace.
2082pub trait FunctionRegistry: Send + Sync {
2083    fn get_by_exported_function(
2084        &self,
2085        ffqn: &FunctionFqn,
2086    ) -> Option<(FunctionMetadata, ComponentId)>;
2087
2088    // TODO: return Option<&TypeWrapperTopLevel>, optimize
2089    /// Get return type of a non-ext function, otherwise return `None`.
2090    fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2091        self.get_by_exported_function(ffqn)
2092            .and_then(|(fn_meta, _)| {
2093                if let ReturnType::Extendable(ReturnTypeExtendable {
2094                    type_wrapper_tl: type_wrapper,
2095                    wit_type: _,
2096                }) = fn_meta.return_type
2097                {
2098                    Some(type_wrapper)
2099                } else {
2100                    None
2101                }
2102            })
2103    }
2104
2105    fn all_exports(&self) -> &[PackageIfcFns];
2106}
2107
2108#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
2109#[display("{_0:?}")]
2110pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
2111
2112impl ExecutionMetadata {
2113    const LINKED_KEY: &str = "obelisk-tracing-linked";
2114    #[must_use]
2115    pub const fn empty() -> Self {
2116        // Remove `Optional` when const hashmap creation is allowed - https://github.com/rust-lang/rust/issues/123197
2117        Self(None)
2118    }
2119
2120    #[must_use]
2121    pub fn from_parent_span(less_specific: &Span) -> Self {
2122        ExecutionMetadata::create(less_specific, false)
2123    }
2124
2125    #[must_use]
2126    pub fn from_linked_span(less_specific: &Span) -> Self {
2127        ExecutionMetadata::create(less_specific, true)
2128    }
2129
2130    /// Attempt to use `Span::current()` to fill the trace and parent span.
2131    /// If that fails, which can happen due to interference with e.g.
2132    /// the stdout layer of the subscriber, use the `span` which is guaranteed
2133    /// to be on info level.
2134    #[must_use]
2135    #[expect(clippy::items_after_statements)]
2136    fn create(span: &Span, link_marker: bool) -> Self {
2137        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2138        let mut metadata = Self(Some(hashbrown::HashMap::default()));
2139        let mut metadata_view = ExecutionMetadataInjectorView {
2140            metadata: &mut metadata,
2141        };
2142        // inject the current context through the amqp headers
2143        fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2144            opentelemetry::global::get_text_map_propagator(|propagator| {
2145                propagator.inject_context(&s.context(), metadata_view);
2146            });
2147        }
2148        inject(&Span::current(), &mut metadata_view);
2149        if metadata_view.is_empty() {
2150            // The subscriber sent us a current span that is actually disabled
2151            inject(span, &mut metadata_view);
2152        }
2153        if link_marker {
2154            metadata_view.set(Self::LINKED_KEY, String::new());
2155        }
2156        metadata
2157    }
2158
2159    pub fn enrich(&self, span: &Span) {
2160        use opentelemetry::trace::TraceContextExt as _;
2161        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2162
2163        let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2164        let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2165            propagator.extract(&metadata_view)
2166        });
2167        if metadata_view.get(Self::LINKED_KEY).is_some() {
2168            let linked_span_context = otel_context.span().span_context().clone();
2169            span.add_link(linked_span_context);
2170        } else {
2171            let _ = span.set_parent(otel_context);
2172        }
2173    }
2174}
2175
2176struct ExecutionMetadataInjectorView<'a> {
2177    metadata: &'a mut ExecutionMetadata,
2178}
2179
2180impl ExecutionMetadataInjectorView<'_> {
2181    fn is_empty(&self) -> bool {
2182        self.metadata
2183            .0
2184            .as_ref()
2185            .is_some_and(hashbrown::HashMap::is_empty)
2186    }
2187}
2188
2189impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2190    fn set(&mut self, key: &str, value: String) {
2191        let key = format!("tracing:{key}");
2192        let map = if let Some(map) = self.metadata.0.as_mut() {
2193            map
2194        } else {
2195            self.metadata.0 = Some(hashbrown::HashMap::new());
2196            assert_matches!(&mut self.metadata.0, Some(map) => map)
2197        };
2198        map.insert(key, value);
2199    }
2200}
2201
2202struct ExecutionMetadataExtractorView<'a> {
2203    metadata: &'a ExecutionMetadata,
2204}
2205
2206impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2207    fn get(&self, key: &str) -> Option<&str> {
2208        self.metadata
2209            .0
2210            .as_ref()
2211            .and_then(|map| map.get(&format!("tracing:{key}")))
2212            .map(std::string::String::as_str)
2213    }
2214
2215    fn keys(&self) -> Vec<&str> {
2216        match &self.metadata.0.as_ref() {
2217            Some(map) => map
2218                .keys()
2219                .filter_map(|key| key.strip_prefix("tracing:"))
2220                .collect(),
2221            None => vec![],
2222        }
2223    }
2224}
2225
2226#[cfg(test)]
2227mod tests {
2228
2229    use rstest::rstest;
2230
2231    use crate::{
2232        ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2233    };
2234    use std::{
2235        hash::{DefaultHasher, Hash, Hasher},
2236        str::FromStr,
2237        sync::Arc,
2238    };
2239
2240    #[test]
2241    fn ulid_parsing() {
2242        let generated = ExecutorId::generate();
2243        let str = generated.to_string();
2244        let parsed = str.parse().unwrap();
2245        assert_eq!(generated, parsed);
2246    }
2247
2248    #[test]
2249    fn execution_id_parsing_top_level() {
2250        let generated = ExecutionId::generate();
2251        let str = generated.to_string();
2252        let parsed = str.parse().unwrap();
2253        assert_eq!(generated, parsed);
2254    }
2255
2256    #[test]
2257    fn execution_id_with_one_level_should_parse() {
2258        let top_level = ExecutionId::generate();
2259        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2260        let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2261        let ser = first_child.to_string();
2262        assert_eq!(format!("{top_level}.n:name_1"), ser);
2263        let parsed = ExecutionId::from_str(&ser).unwrap();
2264        assert_eq!(first_child, parsed);
2265    }
2266
2267    #[test]
2268    fn execution_id_increment_twice() {
2269        let top_level = ExecutionId::generate();
2270        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2271        let first_child = top_level.next_level(&join_set_id);
2272        let second_child = ExecutionId::Derived(first_child.get_incremented());
2273        let ser = second_child.to_string();
2274        assert_eq!(format!("{top_level}.n:name_2"), ser);
2275        let parsed = ExecutionId::from_str(&ser).unwrap();
2276        assert_eq!(second_child, parsed);
2277    }
2278
2279    #[test]
2280    fn execution_id_next_level_twice() {
2281        let top_level = ExecutionId::generate();
2282        let join_set_id_outer =
2283            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2284        let join_set_id_inner =
2285            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2286        let execution_id = ExecutionId::Derived(
2287            top_level
2288                .next_level(&join_set_id_outer)
2289                .get_incremented()
2290                .next_level(&join_set_id_inner)
2291                .get_incremented(),
2292        );
2293        let ser = execution_id.to_string();
2294        assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2295        let parsed = ExecutionId::from_str(&ser).unwrap();
2296        assert_eq!(execution_id, parsed);
2297    }
2298
2299    #[test]
2300    fn execution_id_split_first_level() {
2301        let top_level = ExecutionId::generate();
2302        let join_set_id =
2303            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2304        let execution_id = top_level.next_level(&join_set_id);
2305        let (actual_top_level, actual_join_set) = execution_id.split_to_parts();
2306        assert_eq!(top_level, actual_top_level);
2307        assert_eq!(join_set_id, actual_join_set);
2308    }
2309
2310    #[rstest]
2311    fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2312        let top_level = ExecutionId::generate();
2313        let join_set_id_outer =
2314            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2315        let first_level = top_level
2316            .next_level(&join_set_id_outer)
2317            .get_incremented_by(outer_idx);
2318
2319        let join_set_id_inner =
2320            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2321        let second_level = first_level.next_level(&join_set_id_inner);
2322
2323        let (actual_first_level, actual_join_set) = second_level.split_to_parts();
2324        assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2325        assert_eq!(join_set_id_inner, actual_join_set);
2326    }
2327
2328    #[test]
2329    fn invalid_execution_id_should_fail_to_parse() {
2330        ExecutionId::from_str("E_01KBNHM5FQW81KP5Y3XMNX31K4.g:gg._2.o:oo_2").unwrap_err();
2331    }
2332
2333    #[test]
2334    fn execution_id_hash_should_be_stable() {
2335        let parent = ExecutionId::from_parts(1, 2);
2336        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2337        let sibling_1 = parent.next_level(&join_set_id);
2338        let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2339        let sibling_1 = ExecutionId::Derived(sibling_1);
2340        let join_set_id_inner =
2341            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2342        let child =
2343            ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2344        let parent = parent.random_seed();
2345        let sibling_1 = sibling_1.random_seed();
2346        let sibling_2 = sibling_2.random_seed();
2347        let child = child.random_seed();
2348        let vec = vec![parent, sibling_1, sibling_2, child];
2349        insta::assert_debug_snapshot!(vec);
2350        // check that every hash is unique
2351        let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2352        assert_eq!(4, set.len());
2353    }
2354
2355    #[test]
2356    fn hash_of_str_variants_should_be_equal() {
2357        let input = "foo";
2358        let left = StrVariant::Arc(Arc::from(input));
2359        let right = StrVariant::Static(input);
2360        assert_eq!(left, right);
2361        let mut left_hasher = DefaultHasher::new();
2362        left.hash(&mut left_hasher);
2363        let mut right_hasher = DefaultHasher::new();
2364        right.hash(&mut right_hasher);
2365        let left_hasher = left_hasher.finish();
2366        let right_hasher = right_hasher.finish();
2367        println!("left: {left_hasher:x}, right: {right_hasher:x}");
2368        assert_eq!(left_hasher, right_hasher);
2369    }
2370
2371    #[test]
2372    fn ffqn_from_tuple_with_version_should_work() {
2373        let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2374        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2375    }
2376
2377    #[test]
2378    fn ffqn_from_str_with_version_should_work() {
2379        let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2380        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2381    }
2382
2383    #[tokio::test]
2384    async fn join_set_serde_should_be_consistent() {
2385        use crate::{JoinSetId, JoinSetKind};
2386        use strum::IntoEnumIterator;
2387        for kind in JoinSetKind::iter() {
2388            let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2389            let ser = serde_json::to_string(&join_set_id).unwrap();
2390            let deser = serde_json::from_str(&ser).unwrap();
2391            assert_eq!(join_set_id, deser);
2392        }
2393    }
2394}