obeli_sk_concepts/
lib.rs

1#[cfg(feature = "rusqlite")]
2mod rusqlite_ext;
3pub mod storage;
4pub mod time;
5
6use ::serde::{Deserialize, Serialize};
7use assert_matches::assert_matches;
8pub use indexmap;
9use indexmap::IndexMap;
10use opentelemetry::propagation::{Extractor, Injector};
11pub use prefixed_ulid::ExecutionId;
12use prefixed_ulid::ExecutionIdParseError;
13use serde_json::Value;
14use std::{
15    borrow::Borrow,
16    fmt::{Debug, Display},
17    hash::Hash,
18    marker::PhantomData,
19    ops::Deref,
20    str::FromStr,
21    sync::Arc,
22    time::Duration,
23};
24use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
25use tracing::Span;
26use val_json::{
27    type_wrapper::{TypeConversionError, TypeWrapper},
28    wast_val::{WastVal, WastValWithType},
29    wast_val_ser::params,
30};
31use wasmtime::component::{Type, Val};
32
33pub const NAMESPACE_OBELISK: &str = "obelisk";
34const NAMESPACE_WASI: &str = "wasi";
35pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
36pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
37pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
38
39#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40pub enum FinishedExecutionError {
41    // Activity only, because workflows will be retried forever
42    #[error("permanent timeout")]
43    PermanentTimeout,
44    #[error("permanent failure: {reason_full}")]
45    PermanentFailure {
46        // Exists just for extracting reason of an activity trap, to avoid "activity trap: " prefix.
47        reason_inner: String, // FIXME: remove
48        // Contains reason_inner embedded in the error message
49        reason_full: String,
50        kind: PermanentFailureKind,
51        detail: Option<String>,
52    },
53}
54impl FinishedExecutionError {
55    #[must_use]
56    pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
57        match self {
58            FinishedExecutionError::PermanentTimeout => PendingStateFinishedError::Timeout,
59            FinishedExecutionError::PermanentFailure { .. } => {
60                PendingStateFinishedError::ExecutionFailure
61            }
62        }
63    }
64
65    #[must_use]
66    pub fn new_stubbed_error() -> Self {
67        let reason = "stubbed error".to_string();
68        Self::PermanentFailure {
69            reason_inner: reason.clone(),
70            reason_full: reason,
71            kind: PermanentFailureKind::StubbedError,
72            detail: None,
73        }
74    }
75}
76
77#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub enum PermanentFailureKind {
80    /// Applicable to Workflow
81    NondeterminismDetected,
82    /// Applicable to Workflow, WASM Activity
83    ParamsParsingError,
84    /// Applicable to Workflow, WASM Activity
85    CannotInstantiate,
86    /// Applicable to Workflow, WASM Activity
87    ResultParsingError,
88    /// Applicable to Workflow
89    ImportedFunctionCallError,
90    /// Applicable to WASM Activity
91    ActivityTrap,
92    /// Applicable to Workflow
93    WorkflowTrap,
94    /// Applicable to Webhook
95    WebhookEndpointError,
96    /// Applicable to Stub Activity
97    StubbedError,
98    /// Applicable to Webhook, Workflow, WASM Activity
99    OutOfFuel,
100}
101
102#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "snake_case")]
104pub enum TrapKind {
105    #[display("trap")]
106    Trap,
107    #[display("post_return_trap")]
108    PostReturnTrap,
109    #[display("out of fuel")]
110    OutOfFuel,
111    #[display("host function error")]
112    HostFunctionError,
113}
114
115#[derive(Clone, Eq, derive_more::Display)]
116pub enum StrVariant {
117    Static(&'static str),
118    Arc(Arc<str>),
119}
120
121impl StrVariant {
122    #[must_use]
123    pub const fn empty() -> StrVariant {
124        StrVariant::Static("")
125    }
126}
127
128impl From<String> for StrVariant {
129    fn from(value: String) -> Self {
130        StrVariant::Arc(Arc::from(value))
131    }
132}
133
134impl From<&'static str> for StrVariant {
135    fn from(value: &'static str) -> Self {
136        StrVariant::Static(value)
137    }
138}
139
140impl PartialEq for StrVariant {
141    fn eq(&self, other: &Self) -> bool {
142        match (self, other) {
143            (Self::Static(left), Self::Static(right)) => left == right,
144            (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
145            (Self::Arc(left), Self::Arc(right)) => left == right,
146            (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
147        }
148    }
149}
150
151impl Hash for StrVariant {
152    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
153        match self {
154            StrVariant::Static(val) => val.hash(state),
155            StrVariant::Arc(val) => {
156                let str: &str = val.deref();
157                str.hash(state);
158            }
159        }
160    }
161}
162
163impl Debug for StrVariant {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        Display::fmt(self, f)
166    }
167}
168
169impl Deref for StrVariant {
170    type Target = str;
171    fn deref(&self) -> &Self::Target {
172        match self {
173            Self::Arc(v) => v,
174            Self::Static(v) => v,
175        }
176    }
177}
178
179impl AsRef<str> for StrVariant {
180    fn as_ref(&self) -> &str {
181        match self {
182            Self::Arc(v) => v,
183            Self::Static(v) => v,
184        }
185    }
186}
187
188mod serde_strvariant {
189    use crate::StrVariant;
190    use serde::{
191        Deserialize, Deserializer, Serialize, Serializer,
192        de::{self, Visitor},
193    };
194    use std::{ops::Deref, sync::Arc};
195
196    impl Serialize for StrVariant {
197        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198        where
199            S: Serializer,
200        {
201            serializer.serialize_str(self.deref())
202        }
203    }
204
205    impl<'de> Deserialize<'de> for StrVariant {
206        fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
207        where
208            D: Deserializer<'de>,
209        {
210            deserializer.deserialize_str(StrVariantVisitor)
211        }
212    }
213
214    struct StrVariantVisitor;
215
216    impl Visitor<'_> for StrVariantVisitor {
217        type Value = StrVariant;
218
219        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
220            formatter.write_str("a string")
221        }
222
223        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
224        where
225            E: de::Error,
226        {
227            Ok(StrVariant::Arc(Arc::from(v)))
228        }
229    }
230}
231
232#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
233#[display("{value}")]
234#[serde(transparent)]
235pub struct Name<T> {
236    pub value: StrVariant,
237    #[serde(skip)]
238    phantom_data: PhantomData<fn(T) -> T>,
239}
240
241impl<T> Name<T> {
242    #[must_use]
243    pub fn new_arc(value: Arc<str>) -> Self {
244        Self {
245            value: StrVariant::Arc(value),
246            phantom_data: PhantomData,
247        }
248    }
249
250    #[must_use]
251    pub const fn new_static(value: &'static str) -> Self {
252        Self {
253            value: StrVariant::Static(value),
254            phantom_data: PhantomData,
255        }
256    }
257}
258
259impl<T> Debug for Name<T> {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        Display::fmt(&self, f)
262    }
263}
264
265impl<T> Deref for Name<T> {
266    type Target = str;
267
268    fn deref(&self) -> &Self::Target {
269        self.value.deref()
270    }
271}
272
273impl<T> Borrow<str> for Name<T> {
274    fn borrow(&self) -> &str {
275        self.deref()
276    }
277}
278
279impl<T> From<String> for Name<T> {
280    fn from(value: String) -> Self {
281        Self::new_arc(Arc::from(value))
282    }
283}
284
285#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
286#[display("{}", self.suffix())]
287pub enum PackageExtension {
288    ObeliskExt,
289    ObeliskSchedule,
290    ObeliskStub,
291}
292impl PackageExtension {
293    fn suffix(&self) -> &'static str {
294        match self {
295            PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
296            PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
297            PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
298        }
299    }
300}
301
302#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
303#[cfg_attr(feature = "test", derive(Serialize))]
304pub struct PkgFqn {
305    pub namespace: String, // TODO: StrVariant or reference
306    pub package_name: String,
307    pub version: Option<String>,
308}
309impl PkgFqn {
310    #[must_use]
311    pub fn is_extension(&self) -> bool {
312        Self::is_package_name_ext(&self.package_name)
313    }
314
315    #[must_use]
316    pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
317        use strum::IntoEnumIterator;
318        for package_ext in PackageExtension::iter() {
319            if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
320                return Some((
321                    PkgFqn {
322                        namespace: self.namespace.clone(),
323                        package_name: package_name.to_string(),
324                        version: self.version.clone(),
325                    },
326                    package_ext,
327                ));
328            }
329        }
330        None
331    }
332
333    fn is_package_name_ext(package_name: &str) -> bool {
334        package_name.ends_with(SUFFIX_PKG_EXT)
335            || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
336            || package_name.ends_with(SUFFIX_PKG_STUB)
337    }
338}
339impl Display for PkgFqn {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        let PkgFqn {
342            namespace,
343            package_name,
344            version,
345        } = self;
346        if let Some(version) = version {
347            write!(f, "{namespace}:{package_name}@{version}")
348        } else {
349            write!(f, "{namespace}:{package_name}")
350        }
351    }
352}
353
354#[derive(Hash, Clone, PartialEq, Eq)]
355pub struct IfcFqnMarker;
356
357pub type IfcFqnName = Name<IfcFqnMarker>; // namespace:name/ifc_name OR namespace:name/ifc_name@version
358
359impl IfcFqnName {
360    #[must_use]
361    pub fn namespace(&self) -> &str {
362        self.deref().split_once(':').unwrap().0
363    }
364
365    #[must_use]
366    pub fn package_name(&self) -> &str {
367        let after_colon = self.deref().split_once(':').unwrap().1;
368        after_colon.split_once('/').unwrap().0
369    }
370
371    #[must_use]
372    pub fn version(&self) -> Option<&str> {
373        self.deref().split_once('@').map(|(_, version)| version)
374    }
375
376    #[must_use]
377    pub fn pkg_fqn_name(&self) -> PkgFqn {
378        let (namespace, rest) = self.deref().split_once(':').unwrap();
379        let (package_name, rest) = rest.split_once('/').unwrap();
380        let version = rest.split_once('@').map(|(_, version)| version);
381        PkgFqn {
382            namespace: namespace.to_string(),
383            package_name: package_name.to_string(),
384            version: version.map(std::string::ToString::to_string),
385        }
386    }
387
388    #[must_use]
389    pub fn ifc_name(&self) -> &str {
390        let after_colon = self.deref().split_once(':').unwrap().1;
391        let after_slash = after_colon.split_once('/').unwrap().1;
392        after_slash
393            .split_once('@')
394            .map_or(after_slash, |(ifc, _)| ifc)
395    }
396
397    #[must_use]
398    pub fn from_parts(
399        namespace: &str,
400        package_name: &str,
401        ifc_name: &str,
402        version: Option<&str>,
403    ) -> Self {
404        let mut str = format!("{namespace}:{package_name}/{ifc_name}");
405        if let Some(version) = version {
406            str += "@";
407            str += version;
408        }
409        Self::new_arc(Arc::from(str))
410    }
411
412    #[must_use]
413    /// Returns true if this is an `-obelisk-*` extension interface.
414    pub fn is_extension(&self) -> bool {
415        PkgFqn::is_package_name_ext(self.package_name())
416    }
417
418    #[must_use]
419    pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
420        self.package_name().strip_suffix(SUFFIX_PKG_EXT)
421    }
422
423    #[must_use]
424    pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
425        self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
426    }
427
428    #[must_use]
429    pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
430        self.package_name().strip_suffix(SUFFIX_PKG_STUB)
431    }
432
433    #[must_use]
434    pub fn is_namespace_obelisk(&self) -> bool {
435        self.namespace() == NAMESPACE_OBELISK
436    }
437
438    #[must_use]
439    pub fn is_namespace_wasi(&self) -> bool {
440        self.namespace() == NAMESPACE_WASI
441    }
442}
443
444#[derive(Hash, Clone, PartialEq, Eq)]
445pub struct FnMarker;
446
447pub type FnName = Name<FnMarker>;
448
449#[derive(Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
450pub struct FunctionFqn {
451    pub ifc_fqn: IfcFqnName,
452    pub function_name: FnName,
453}
454
455impl FunctionFqn {
456    #[must_use]
457    pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
458        Self {
459            ifc_fqn: Name::new_arc(ifc_fqn),
460            function_name: Name::new_arc(function_name),
461        }
462    }
463
464    #[must_use]
465    pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
466        Self {
467            ifc_fqn: Name::new_static(ifc_fqn),
468            function_name: Name::new_static(function_name),
469        }
470    }
471
472    #[must_use]
473    pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
474        Self::new_static(tuple.0, tuple.1)
475    }
476
477    pub fn try_from_tuple(
478        ifc_fqn: &str,
479        function_name: &str,
480    ) -> Result<Self, FunctionFqnParseError> {
481        if function_name.contains('.') {
482            Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
483        } else {
484            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
485        }
486    }
487}
488
489#[derive(Debug, thiserror::Error)]
490pub enum FunctionFqnParseError {
491    #[error("delimiter `.` not found")]
492    DelimiterNotFound,
493    #[error("delimiter `.` found in function name")]
494    DelimiterFoundInFunctionName,
495}
496
497impl FromStr for FunctionFqn {
498    type Err = FunctionFqnParseError;
499
500    fn from_str(s: &str) -> Result<Self, Self::Err> {
501        if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
502            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
503        } else {
504            Err(FunctionFqnParseError::DelimiterNotFound)
505        }
506    }
507}
508
509impl Display for FunctionFqn {
510    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511        write!(
512            f,
513            "{ifc_fqn}.{function_name}",
514            ifc_fqn = self.ifc_fqn,
515            function_name = self.function_name
516        )
517    }
518}
519
520impl Debug for FunctionFqn {
521    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522        Display::fmt(&self, f)
523    }
524}
525
526#[cfg(any(test, feature = "test"))]
527impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
528    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
529        let illegal = [':', '@', '.'];
530        let namespace = u.arbitrary::<String>()?.replace(illegal, "");
531        let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
532        let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
533        let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
534
535        Ok(FunctionFqn::new_arc(
536            Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
537            Arc::from(fn_name),
538        ))
539    }
540}
541
542#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
543pub struct TypeWrapperTopLevel {
544    pub ok: Option<Box<TypeWrapper>>,
545    pub err: Option<Box<TypeWrapper>>,
546}
547impl From<TypeWrapperTopLevel> for TypeWrapper {
548    fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
549        TypeWrapper::Result {
550            ok: value.ok,
551            err: value.err,
552        }
553    }
554}
555
556#[derive(Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize)]
557pub enum SupportedFunctionReturnValue {
558    Ok {
559        #[debug(skip)]
560        ok: Option<WastValWithType>,
561    },
562    Err {
563        #[debug(skip)]
564        err: Option<WastValWithType>,
565    },
566    ExecutionError(FinishedExecutionError),
567}
568pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
569    SupportedFunctionReturnValue::Ok { ok: None };
570
571#[derive(Debug, thiserror::Error)]
572pub enum ResultParsingError {
573    #[error("return value must not be empty")]
574    NoValue,
575    #[error("return value cannot be parsed, multi-value results are not supported")]
576    MultiValue,
577    #[error("return value cannot be parsed, {0}")]
578    TypeConversionError(val_json::type_wrapper::TypeConversionError),
579    #[error(transparent)]
580    ResultParsingErrorFromVal(ResultParsingErrorFromVal),
581}
582
583#[derive(Debug, thiserror::Error)]
584pub enum ResultParsingErrorFromVal {
585    #[error("return value cannot be parsed, {0}")]
586    WastValConversionError(val_json::wast_val::WastValConversionError),
587    #[error("top level type must be a result")]
588    TopLevelTypeMustBeAResult,
589    #[error("value does not type check")]
590    TypeCheckError,
591}
592
593impl SupportedFunctionReturnValue {
594    pub fn new<
595        I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
596    >(
597        mut iter: I,
598    ) -> Result<Self, ResultParsingError> {
599        if iter.len() == 0 {
600            Err(ResultParsingError::NoValue)
601        } else if iter.len() == 1 {
602            let (val, r#type) = iter.next().unwrap();
603            let r#type =
604                TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
605            Self::from_val_and_type_wrapper(val, r#type)
606                .map_err(ResultParsingError::ResultParsingErrorFromVal)
607        } else {
608            Err(ResultParsingError::MultiValue)
609        }
610    }
611
612    #[expect(clippy::result_unit_err)]
613    pub fn from_wast_val_with_type(
614        value: WastValWithType,
615    ) -> Result<SupportedFunctionReturnValue, ()> {
616        match value {
617            WastValWithType {
618                r#type: TypeWrapper::Result { ok: None, err: _ },
619                value: WastVal::Result(Ok(None)),
620            } => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
621            WastValWithType {
622                r#type:
623                    TypeWrapper::Result {
624                        ok: Some(ok),
625                        err: _,
626                    },
627                value: WastVal::Result(Ok(Some(value))),
628            } => Ok(SupportedFunctionReturnValue::Ok {
629                ok: Some(WastValWithType {
630                    r#type: *ok,
631                    value: *value,
632                }),
633            }),
634            WastValWithType {
635                r#type: TypeWrapper::Result { ok: _, err: None },
636                value: WastVal::Result(Err(None)),
637            } => Ok(SupportedFunctionReturnValue::Err { err: None }),
638            WastValWithType {
639                r#type:
640                    TypeWrapper::Result {
641                        ok: _,
642                        err: Some(err),
643                    },
644                value: WastVal::Result(Err(Some(value))),
645            } => Ok(SupportedFunctionReturnValue::Err {
646                err: Some(WastValWithType {
647                    r#type: *err,
648                    value: *value,
649                }),
650            }),
651            _ => Err(()),
652        }
653    }
654
655    pub fn from_val_and_type_wrapper(
656        value: wasmtime::component::Val,
657        ty: TypeWrapper,
658    ) -> Result<Self, ResultParsingErrorFromVal> {
659        let TypeWrapper::Result { ok, err } = ty else {
660            return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
661        };
662        let ty = TypeWrapperTopLevel { ok, err };
663        Self::from_val_and_type_wrapper_tl(value, ty)
664    }
665
666    pub fn from_val_and_type_wrapper_tl(
667        value: wasmtime::component::Val,
668        ty: TypeWrapperTopLevel,
669    ) -> Result<Self, ResultParsingErrorFromVal> {
670        let wasmtime::component::Val::Result(value) = value else {
671            return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
672        };
673
674        match (ty.ok, ty.err, value) {
675            (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
676            (Some(ok_type), _, Ok(Some(value))) => Ok(SupportedFunctionReturnValue::Ok {
677                ok: Some(WastValWithType {
678                    r#type: *ok_type,
679                    value: WastVal::try_from(*value)
680                        .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
681                }),
682            }),
683            (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err { err: None }),
684            (_, Some(err_type), Err(Some(value))) => Ok(SupportedFunctionReturnValue::Err {
685                err: Some(WastValWithType {
686                    r#type: *err_type,
687                    value: WastVal::try_from(*value)
688                        .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
689                }),
690            }),
691            _other => Err(ResultParsingErrorFromVal::TypeCheckError),
692        }
693    }
694
695    #[must_use]
696    pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
697        match self {
698            SupportedFunctionReturnValue::Ok { ok: None } => WastVal::Result(Ok(None)),
699            SupportedFunctionReturnValue::Ok { ok: Some(v) } => {
700                WastVal::Result(Ok(Some(Box::new(v.value))))
701            }
702            SupportedFunctionReturnValue::Err { err: None } => WastVal::Result(Err(None)),
703            SupportedFunctionReturnValue::Err { err: Some(v) } => {
704                WastVal::Result(Err(Some(Box::new(v.value))))
705            }
706            SupportedFunctionReturnValue::ExecutionError(_) => {
707                execution_error_to_wast_val(&get_return_type())
708            }
709        }
710    }
711
712    #[must_use]
713    pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
714        match self {
715            SupportedFunctionReturnValue::Ok { ok: _ } => PendingStateFinishedResultKind(Ok(())),
716            SupportedFunctionReturnValue::Err { err: _ } => {
717                PendingStateFinishedResultKind(Err(PendingStateFinishedError::FallibleError))
718            }
719            SupportedFunctionReturnValue::ExecutionError(_) => {
720                PendingStateFinishedResultKind(Err(PendingStateFinishedError::ExecutionFailure))
721            }
722        }
723    }
724}
725
726#[must_use]
727pub fn execution_error_to_wast_val(ret_type: &TypeWrapperTopLevel) -> WastVal {
728    match ret_type {
729        TypeWrapperTopLevel { ok: _, err: None } => return WastVal::Result(Err(None)),
730        TypeWrapperTopLevel {
731            ok: _,
732            err: Some(inner),
733        } => match inner.as_ref() {
734            TypeWrapper::String => {
735                return WastVal::Result(Err(Some(Box::new(WastVal::String(
736                    EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
737                )))));
738            }
739            TypeWrapper::Variant(variants) => {
740                if let Some(Some(TypeWrapper::Record(fields))) =
741                    variants.get(EXECUTION_FAILED_STRING_OR_VARIANT)
742                    && fields.is_empty()
743                {
744                    return WastVal::Result(Err(Some(Box::new(WastVal::Variant(
745                        EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
746                        None,
747                    )))));
748                }
749            }
750            _ => {}
751        },
752    }
753    unreachable!("unexpected top-level return type {ret_type:?} cannot be ReturnTypeCompatible")
754}
755
756#[derive(Debug, Clone, PartialEq, Eq)]
757pub struct Params(ParamsInternal);
758
759#[derive(Debug, Clone, PartialEq, Eq)]
760enum ParamsInternal {
761    JsonValues(Vec<Value>),
762    Vals {
763        //TODO: is Arc needed here? Or move upwards?
764        vals: Arc<[wasmtime::component::Val]>,
765    },
766    Empty,
767}
768
769impl Default for Params {
770    fn default() -> Self {
771        Self(ParamsInternal::Empty)
772    }
773}
774
775pub const SUFFIX_FN_SUBMIT: &str = "-submit";
776pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
777pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
778pub const SUFFIX_FN_STUB: &str = "-stub";
779pub const SUFFIX_FN_GET: &str = "-get";
780pub const SUFFIX_FN_INVOKE: &str = "-invoke";
781
782#[derive(
783    Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, strum::EnumIter,
784)]
785#[serde(rename_all = "snake_case")]
786pub enum FunctionExtension {
787    Submit,
788    AwaitNext,
789    Schedule,
790    Stub,
791    Get,
792    Invoke,
793}
794impl FunctionExtension {
795    #[must_use]
796    pub fn suffix(&self) -> &'static str {
797        match self {
798            FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
799            FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
800            FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
801            FunctionExtension::Stub => SUFFIX_FN_STUB,
802            FunctionExtension::Get => SUFFIX_FN_GET,
803            FunctionExtension::Invoke => SUFFIX_FN_INVOKE,
804        }
805    }
806
807    #[must_use]
808    pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
809        matches!(
810            (pkg_ext, self),
811            (
812                PackageExtension::ObeliskExt,
813                FunctionExtension::Submit
814                    | FunctionExtension::AwaitNext
815                    | FunctionExtension::Get
816                    | FunctionExtension::Invoke
817            ) | (
818                PackageExtension::ObeliskSchedule,
819                FunctionExtension::Schedule
820            ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
821        )
822    }
823}
824
825#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
826pub struct FunctionMetadata {
827    pub ffqn: FunctionFqn,
828    pub parameter_types: ParameterTypes,
829    pub return_type: ReturnType,
830    pub extension: Option<FunctionExtension>,
831    /// Externally submittable: primary functions + `-schedule` extended, but no activity stubs
832    pub submittable: bool,
833}
834impl FunctionMetadata {
835    #[must_use]
836    pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
837        self.extension.map(|extension| {
838            let prefix = self
839                .ffqn
840                .function_name
841                .value
842                .strip_suffix(extension.suffix())
843                .unwrap_or_else(|| {
844                    panic!(
845                        "extension function {} must end with expected suffix {}",
846                        self.ffqn.function_name,
847                        extension.suffix()
848                    )
849                });
850            (prefix, extension)
851        })
852    }
853}
854impl Display for FunctionMetadata {
855    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856        write!(
857            f,
858            "{ffqn}: func{params} -> {return_type}",
859            ffqn = self.ffqn,
860            params = self.parameter_types,
861            return_type = self.return_type,
862        )
863    }
864}
865
866pub mod serde_params {
867    use crate::{Params, ParamsInternal};
868    use serde::de::{SeqAccess, Visitor};
869    use serde::ser::SerializeSeq;
870    use serde::{Deserialize, Serialize};
871    use serde_json::Value;
872    use val_json::wast_val::WastVal;
873
874    impl Serialize for Params {
875        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
876        where
877            S: ::serde::Serializer,
878        {
879            match &self.0 {
880                ParamsInternal::Vals { vals } => {
881                    let mut seq = serializer.serialize_seq(Some(vals.len()))?; // size must be equal, checked when constructed.
882                    for val in vals.iter() {
883                        let value = WastVal::try_from(val.clone())
884                            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
885                        seq.serialize_element(&value)?;
886                    }
887                    seq.end()
888                }
889                ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
890                ParamsInternal::JsonValues(vec) => {
891                    let mut seq = serializer.serialize_seq(Some(vec.len()))?;
892                    for item in vec {
893                        seq.serialize_element(item)?;
894                    }
895                    seq.end()
896                }
897            }
898        }
899    }
900
901    pub struct VecVisitor;
902
903    impl<'de> Visitor<'de> for VecVisitor {
904        type Value = Vec<Value>;
905
906        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
907            formatter.write_str("a sequence of `Value`")
908        }
909
910        #[inline]
911        fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
912        where
913            V: SeqAccess<'de>,
914        {
915            let mut vec = Vec::new();
916            while let Some(elem) = visitor.next_element()? {
917                vec.push(elem);
918            }
919            Ok(vec)
920        }
921    }
922
923    impl<'de> Deserialize<'de> for Params {
924        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
925        where
926            D: serde::Deserializer<'de>,
927        {
928            let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
929            if vec.is_empty() {
930                Ok(Self(ParamsInternal::Empty))
931            } else {
932                Ok(Self(ParamsInternal::JsonValues(vec)))
933            }
934        }
935    }
936}
937
938#[derive(Debug, thiserror::Error)]
939pub enum ParamsParsingError {
940    #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
941    ParameterTypeError {
942        idx: usize,
943        err: TypeConversionError,
944    },
945    #[error("parameters cannot be deserialized: {0}")]
946    ParamsDeserializationError(serde_json::Error),
947    #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
948    ParameterCardinalityMismatch { expected: usize, specified: usize },
949}
950
951impl ParamsParsingError {
952    #[must_use]
953    pub fn detail(&self) -> Option<String> {
954        match self {
955            ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
956            ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
957            ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
958        }
959    }
960}
961
962#[derive(Debug, thiserror::Error)]
963pub enum ParamsFromJsonError {
964    #[error("value must be a json array containing function parameters")]
965    MustBeArray,
966}
967
968impl Params {
969    #[must_use]
970    pub const fn empty() -> Self {
971        Self(ParamsInternal::Empty)
972    }
973
974    #[must_use]
975    pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
976        if vals.is_empty() {
977            Self::empty()
978        } else {
979            Self(ParamsInternal::Vals { vals })
980        }
981    }
982
983    #[must_use]
984    pub fn from_json_values(vec: Vec<Value>) -> Self {
985        if vec.is_empty() {
986            Self::empty()
987        } else {
988            Self(ParamsInternal::JsonValues(vec))
989        }
990    }
991
992    pub fn typecheck<'a>(
993        &self,
994        param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
995    ) -> Result<(), ParamsParsingError> {
996        if param_types.len() != self.len() {
997            return Err(ParamsParsingError::ParameterCardinalityMismatch {
998                expected: param_types.len(),
999                specified: self.len(),
1000            });
1001        }
1002        match &self.0 {
1003            ParamsInternal::Vals { .. } /* already typechecked */ | ParamsInternal::Empty => {}
1004            ParamsInternal::JsonValues(params) => {
1005                params::deserialize_values(params, param_types)
1006                .map_err(ParamsParsingError::ParamsDeserializationError)?;
1007            }
1008        }
1009        Ok(())
1010    }
1011
1012    pub fn as_vals(
1013        &self,
1014        param_types: Box<[(String, Type)]>,
1015    ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1016        if param_types.len() != self.len() {
1017            return Err(ParamsParsingError::ParameterCardinalityMismatch {
1018                expected: param_types.len(),
1019                specified: self.len(),
1020            });
1021        }
1022        match &self.0 {
1023            ParamsInternal::JsonValues(json_vec) => {
1024                let param_types = param_types
1025                    .into_vec()
1026                    .into_iter()
1027                    .enumerate()
1028                    .map(|(idx, (_param_name, ty))| {
1029                        TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1030                    })
1031                    .collect::<Result<Vec<_>, _>>()
1032                    .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1033                Ok(params::deserialize_values(json_vec, param_types.iter())
1034                    .map_err(ParamsParsingError::ParamsDeserializationError)?
1035                    .into_iter()
1036                    .map(Val::from)
1037                    .collect())
1038            }
1039            ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1040            ParamsInternal::Empty => Ok(Arc::from([])),
1041        }
1042    }
1043
1044    #[must_use]
1045    pub fn len(&self) -> usize {
1046        match &self.0 {
1047            ParamsInternal::JsonValues(vec) => vec.len(),
1048            ParamsInternal::Vals { vals, .. } => vals.len(),
1049            ParamsInternal::Empty => 0,
1050        }
1051    }
1052
1053    #[must_use]
1054    pub fn is_empty(&self) -> bool {
1055        self.len() == 0
1056    }
1057}
1058
1059pub mod prefixed_ulid {
1060    use crate::{JoinSetId, JoinSetIdParseError};
1061    use serde_with::{DeserializeFromStr, SerializeDisplay};
1062    use std::{
1063        fmt::{Debug, Display},
1064        hash::Hasher,
1065        marker::PhantomData,
1066        num::ParseIntError,
1067        str::FromStr,
1068        sync::Arc,
1069    };
1070    use ulid::Ulid;
1071
1072    #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
1073    #[derive_where::derive_where(Clone, Copy)]
1074    #[display("{}_{ulid}", Self::prefix())]
1075    pub struct PrefixedUlid<T: 'static> {
1076        ulid: Ulid,
1077        phantom_data: PhantomData<fn(T) -> T>,
1078    }
1079
1080    impl<T> PrefixedUlid<T> {
1081        const fn new(ulid: Ulid) -> Self {
1082            Self {
1083                ulid,
1084                phantom_data: PhantomData,
1085            }
1086        }
1087
1088        fn prefix() -> &'static str {
1089            std::any::type_name::<T>().rsplit("::").next().unwrap()
1090        }
1091    }
1092
1093    impl<T> PrefixedUlid<T> {
1094        #[must_use]
1095        pub fn generate() -> Self {
1096            Self::new(Ulid::new())
1097        }
1098
1099        #[must_use]
1100        pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1101            Self::new(Ulid::from_parts(timestamp_ms, random))
1102        }
1103
1104        #[must_use]
1105        pub fn timestamp_part(&self) -> u64 {
1106            self.ulid.timestamp_ms()
1107        }
1108
1109        #[must_use]
1110        /// Fills only the lower 80 bits of the returned 128-bit value.
1111        pub fn random_part(&self) -> u128 {
1112            self.ulid.random()
1113        }
1114    }
1115
1116    #[derive(Debug, thiserror::Error)]
1117    pub enum PrefixedUlidParseError {
1118        #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1119        WrongPrefix { input: String, expected: String },
1120        #[error("cannot parse ULID suffix from `{input}`")]
1121        CannotParseUlid { input: String },
1122    }
1123
1124    mod impls {
1125        use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1126        use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1127
1128        impl<T> FromStr for PrefixedUlid<T> {
1129            type Err = PrefixedUlidParseError;
1130
1131            fn from_str(input: &str) -> Result<Self, Self::Err> {
1132                let prefix = Self::prefix();
1133                let mut input_chars = input.chars();
1134                for exp in prefix.chars() {
1135                    if input_chars.next() != Some(exp) {
1136                        return Err(PrefixedUlidParseError::WrongPrefix {
1137                            input: input.to_string(),
1138                            expected: format!("{prefix}_"),
1139                        });
1140                    }
1141                }
1142                if input_chars.next() != Some('_') {
1143                    return Err(PrefixedUlidParseError::WrongPrefix {
1144                        input: input.to_string(),
1145                        expected: format!("{prefix}_"),
1146                    });
1147                }
1148                let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1149                    return Err(PrefixedUlidParseError::CannotParseUlid {
1150                        input: input.to_string(),
1151                    });
1152                };
1153                Ok(Self {
1154                    ulid,
1155                    phantom_data: PhantomData,
1156                })
1157            }
1158        }
1159
1160        impl<T> Debug for PrefixedUlid<T> {
1161            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1162                Display::fmt(&self, f)
1163            }
1164        }
1165
1166        impl<T> Hash for PrefixedUlid<T> {
1167            fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1168                Self::prefix().hash(state);
1169                self.ulid.hash(state);
1170                self.phantom_data.hash(state);
1171            }
1172        }
1173
1174        impl<T> PartialEq for PrefixedUlid<T> {
1175            fn eq(&self, other: &Self) -> bool {
1176                self.ulid == other.ulid
1177            }
1178        }
1179
1180        impl<T> Eq for PrefixedUlid<T> {}
1181
1182        impl<T> PartialOrd for PrefixedUlid<T> {
1183            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1184                Some(self.cmp(other))
1185            }
1186        }
1187
1188        impl<T> Ord for PrefixedUlid<T> {
1189            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1190                self.ulid.cmp(&other.ulid)
1191            }
1192        }
1193    }
1194
1195    pub mod prefix {
1196        pub struct E;
1197        pub struct Exr;
1198        pub struct Run;
1199        pub struct Delay;
1200    }
1201
1202    pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1203    pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1204    pub type RunId = PrefixedUlid<prefix::Run>;
1205    pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; // Never used directly, tracking top level ExecutionId
1206
1207    #[cfg(any(test, feature = "test"))]
1208    impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1209        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1210            Ok(Self::new(ulid::Ulid::from_parts(
1211                u.arbitrary()?,
1212                u.arbitrary()?,
1213            )))
1214        }
1215    }
1216
1217    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1218    pub enum ExecutionId {
1219        TopLevel(ExecutionIdTopLevel),
1220        Derived(ExecutionIdDerived),
1221    }
1222
1223    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1224    pub struct ExecutionIdDerived {
1225        top_level: ExecutionIdTopLevel,
1226        infix: Arc<str>,
1227        idx: u64,
1228    }
1229    impl ExecutionIdDerived {
1230        #[must_use]
1231        pub fn get_incremented(&self) -> Self {
1232            self.get_incremented_by(1)
1233        }
1234        #[must_use]
1235        pub fn get_incremented_by(&self, count: u64) -> Self {
1236            ExecutionIdDerived {
1237                top_level: self.top_level,
1238                infix: self.infix.clone(),
1239                idx: self.idx + count,
1240            }
1241        }
1242        #[must_use]
1243        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1244            let ExecutionIdDerived {
1245                top_level,
1246                infix,
1247                idx,
1248            } = self;
1249            let infix = Arc::from(format!(
1250                "{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1251            ));
1252            ExecutionIdDerived {
1253                top_level: *top_level,
1254                infix,
1255                idx: EXECUTION_ID_START_IDX,
1256            }
1257        }
1258        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1259            let ExecutionIdDerived {
1260                top_level,
1261                infix,
1262                idx,
1263            } = self;
1264            write!(
1265                f,
1266                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}"
1267            )
1268        }
1269
1270        // Two cases:
1271        // A. infix does not contain dots -> first level, will be split into the top level, the whole infix must be JoinSetId.
1272        // B. infix must be split into old_infix _ old_idx . JoinSetId
1273        pub fn split_to_parts(
1274            &self,
1275        ) -> Result<(ExecutionId, JoinSetId), ExecutionIdDerivedSplitError> {
1276            if let Some((old_infix_and_index, join_set_id)) =
1277                self.infix.rsplit_once(EXECUTION_ID_INFIX)
1278            {
1279                let join_set_id = JoinSetId::from_str(join_set_id)?;
1280                let Some((old_infix, old_idx)) =
1281                    old_infix_and_index.rsplit_once(EXECUTION_ID_JOIN_SET_INFIX)
1282                else {
1283                    return Err(ExecutionIdDerivedSplitError::CannotFindJoinSetDelimiter);
1284                };
1285                let parent = ExecutionIdDerived {
1286                    top_level: self.top_level,
1287                    infix: Arc::from(old_infix),
1288                    idx: old_idx
1289                        .parse()
1290                        .map_err(ExecutionIdDerivedSplitError::CannotParseOldIndex)?,
1291                };
1292                Ok((ExecutionId::Derived(parent), join_set_id))
1293            } else {
1294                // This was the first level
1295                Ok((
1296                    ExecutionId::TopLevel(self.top_level),
1297                    JoinSetId::from_str(&self.infix)?,
1298                ))
1299            }
1300        }
1301    }
1302
1303    #[derive(Debug, thiserror::Error)]
1304    pub enum ExecutionIdDerivedSplitError {
1305        #[error(transparent)]
1306        JoinSetIdParseError(#[from] JoinSetIdParseError),
1307        #[error("cannot parse index of parent execution - {0}")]
1308        CannotParseOldIndex(ParseIntError),
1309        #[error("cannot find join set delimiter")]
1310        CannotFindJoinSetDelimiter,
1311    }
1312
1313    impl Debug for ExecutionIdDerived {
1314        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1315            self.display_or_debug(f)
1316        }
1317    }
1318    impl Display for ExecutionIdDerived {
1319        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1320            self.display_or_debug(f)
1321        }
1322    }
1323    impl FromStr for ExecutionIdDerived {
1324        type Err = DerivedIdParseError;
1325
1326        fn from_str(input: &str) -> Result<Self, Self::Err> {
1327            let (top_level, infix, idx) = derived_from_str(input)?;
1328            Ok(ExecutionIdDerived {
1329                top_level,
1330                infix,
1331                idx,
1332            })
1333        }
1334    }
1335
1336    fn derived_from_str<T: 'static>(
1337        input: &str,
1338    ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1339        if let Some((prefix, suffix)) = input.split_once(EXECUTION_ID_INFIX) {
1340            let top_level = PrefixedUlid::from_str(prefix)
1341                .map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1342            let Some((infix, idx)) = suffix.rsplit_once(EXECUTION_ID_JOIN_SET_INFIX) else {
1343                return Err(DerivedIdParseError::SecondDelimiterNotFound);
1344            };
1345            let infix = Arc::from(infix);
1346            let idx = u64::from_str(idx).map_err(DerivedIdParseError::ParseIndexError)?;
1347            Ok((top_level, infix, idx))
1348        } else {
1349            Err(DerivedIdParseError::FirstDelimiterNotFound)
1350        }
1351    }
1352
1353    #[cfg(any(test, feature = "test"))]
1354    impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1355        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1356            let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1357            let join_set_id = JoinSetId::arbitrary(u)?;
1358            Ok(top_level.next_level(&join_set_id))
1359        }
1360    }
1361
1362    #[derive(Debug, thiserror::Error)]
1363    pub enum DerivedIdParseError {
1364        #[error(transparent)]
1365        PrefixedUlidParseError(PrefixedUlidParseError),
1366        #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1367        FirstDelimiterNotFound,
1368        #[error("cannot parse derived id - delimiter `{EXECUTION_ID_JOIN_SET_INFIX}` not found")]
1369        SecondDelimiterNotFound,
1370        #[error(
1371            "cannot parse derived id - suffix after `{EXECUTION_ID_JOIN_SET_INFIX}` must be a number"
1372        )]
1373        ParseIndexError(ParseIntError),
1374    }
1375
1376    impl ExecutionId {
1377        #[must_use]
1378        pub fn generate() -> Self {
1379            ExecutionId::TopLevel(PrefixedUlid::generate())
1380        }
1381
1382        #[must_use]
1383        pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1384            match &self {
1385                ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1386                ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1387            }
1388        }
1389
1390        #[must_use]
1391        pub fn is_top_level(&self) -> bool {
1392            matches!(self, ExecutionId::TopLevel(_))
1393        }
1394
1395        #[must_use]
1396        pub fn random_seed(&self) -> u64 {
1397            let mut hasher = fxhash::FxHasher::default();
1398            // `Self::random_part` uses only the lower 80 bits of a 128-bit value.
1399            // Truncate to 64 bits, since including the remaining 16 bits
1400            // would not increase the entropy of the 64-bit output.
1401            #[expect(clippy::cast_possible_truncation)]
1402            let random_part = self.get_top_level().random_part() as u64;
1403            hasher.write_u64(random_part);
1404            hasher.write_u64(self.get_top_level().timestamp_part());
1405            if let ExecutionId::Derived(ExecutionIdDerived {
1406                top_level: _,
1407                infix,
1408                idx,
1409            }) = self
1410            {
1411                // Each derived execution ID should return different seed.
1412                hasher.write(infix.as_bytes());
1413                hasher.write_u64(*idx);
1414            }
1415            hasher.finish()
1416        }
1417
1418        #[must_use]
1419        pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1420            ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1421        }
1422
1423        #[must_use]
1424        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1425            match &self {
1426                ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1427                    top_level: *top_level,
1428                    infix: Arc::from(join_set_id.to_string()),
1429                    idx: EXECUTION_ID_START_IDX,
1430                },
1431                ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1432            }
1433        }
1434
1435        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1436            match &self {
1437                ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1438                ExecutionId::Derived(derived) => Display::fmt(derived, f),
1439            }
1440        }
1441    }
1442
1443    const EXECUTION_ID_INFIX: char = '.';
1444    const EXECUTION_ID_JOIN_SET_INFIX: char = '_';
1445    const EXECUTION_ID_START_IDX: u64 = 1;
1446    pub const JOIN_SET_START_IDX: u64 = 1;
1447    const DELAY_ID_START_IDX: u64 = 1;
1448
1449    #[derive(Debug, thiserror::Error)]
1450    pub enum ExecutionIdParseError {
1451        #[error(transparent)]
1452        PrefixedUlidParseError(#[from] PrefixedUlidParseError),
1453        #[error(
1454            "cannot parse derived execution id - first delimiter `{EXECUTION_ID_INFIX}` not found"
1455        )]
1456        FirstDelimiterNotFound,
1457        #[error(
1458            "cannot parse derived execution id - second delimiter `{EXECUTION_ID_INFIX}` not found"
1459        )]
1460        SecondDelimiterNotFound,
1461        #[error("cannot parse derived execution id - last suffix must be a number")]
1462        ParseIndexError(#[from] ParseIntError),
1463    }
1464
1465    impl FromStr for ExecutionId {
1466        type Err = ExecutionIdParseError;
1467
1468        fn from_str(input: &str) -> Result<Self, Self::Err> {
1469            if input.contains(EXECUTION_ID_INFIX) {
1470                ExecutionIdDerived::from_str(input)
1471                    .map(ExecutionId::Derived)
1472                    .map_err(|err| match err {
1473                        DerivedIdParseError::FirstDelimiterNotFound => {
1474                            unreachable!("first delimiter checked")
1475                        }
1476                        DerivedIdParseError::SecondDelimiterNotFound => {
1477                            ExecutionIdParseError::SecondDelimiterNotFound
1478                        }
1479                        DerivedIdParseError::PrefixedUlidParseError(err) => {
1480                            ExecutionIdParseError::PrefixedUlidParseError(err)
1481                        }
1482                        DerivedIdParseError::ParseIndexError(err) => {
1483                            ExecutionIdParseError::ParseIndexError(err)
1484                        }
1485                    })
1486            } else {
1487                Ok(ExecutionId::TopLevel(PrefixedUlid::from_str(input)?))
1488            }
1489        }
1490    }
1491
1492    #[derive(Debug, thiserror::Error)]
1493    pub enum ExecutionIdStructuralParseError {
1494        #[error(transparent)]
1495        ExecutionIdParseError(#[from] ExecutionIdParseError),
1496        #[error("execution-id must be a record with `id` field of type string")]
1497        TypeError,
1498    }
1499
1500    impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1501        type Error = ExecutionIdStructuralParseError;
1502
1503        fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1504            if let wasmtime::component::Val::Record(key_vals) = execution_id
1505                && key_vals.len() == 1
1506                && let Some((key, execution_id)) = key_vals.first()
1507                && key == "id"
1508                && let wasmtime::component::Val::String(execution_id) = execution_id
1509            {
1510                ExecutionId::from_str(execution_id)
1511                    .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1512            } else {
1513                Err(ExecutionIdStructuralParseError::TypeError)
1514            }
1515        }
1516    }
1517
1518    impl Debug for ExecutionId {
1519        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1520            self.display_or_debug(f)
1521        }
1522    }
1523
1524    impl Display for ExecutionId {
1525        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1526            self.display_or_debug(f)
1527        }
1528    }
1529
1530    #[cfg(any(test, feature = "test"))]
1531    impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1532        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1533            Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1534        }
1535    }
1536
1537    /// Mirrors [`ExecutionId`], with different prefix and `idx` for tracking each delay within the join set.
1538    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1539    pub struct DelayId {
1540        top_level: DelayIdTopLevel,
1541        infix: Arc<str>,
1542        idx: u64,
1543    }
1544    impl DelayId {
1545        #[must_use]
1546        pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1547            Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1548        }
1549
1550        #[must_use]
1551        pub fn new_with_index(
1552            execution_id: &ExecutionId,
1553            join_set_id: &JoinSetId,
1554            idx: u64,
1555        ) -> DelayId {
1556            let ExecutionIdDerived {
1557                top_level: PrefixedUlid { ulid, .. },
1558                infix,
1559                idx: _,
1560            } = execution_id.next_level(join_set_id);
1561            let top_level = DelayIdTopLevel::new(ulid);
1562            DelayId {
1563                top_level,
1564                infix,
1565                idx,
1566            }
1567        }
1568
1569        #[must_use]
1570        pub fn get_incremented(&self) -> Self {
1571            Self {
1572                top_level: self.top_level,
1573                infix: self.infix.clone(),
1574                idx: self.idx + 1,
1575            }
1576        }
1577
1578        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1579            let DelayId {
1580                top_level,
1581                infix,
1582                idx,
1583            } = self;
1584            write!(
1585                f,
1586                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}"
1587            )
1588        }
1589    }
1590
1591    pub mod delay_impl {
1592        use super::{DelayId, DerivedIdParseError, derived_from_str};
1593        use std::{
1594            fmt::{Debug, Display},
1595            str::FromStr,
1596        };
1597
1598        impl Debug for DelayId {
1599            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1600                self.display_or_debug(f)
1601            }
1602        }
1603
1604        impl Display for DelayId {
1605            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1606                self.display_or_debug(f)
1607            }
1608        }
1609
1610        impl FromStr for DelayId {
1611            type Err = DerivedIdParseError;
1612
1613            fn from_str(input: &str) -> Result<Self, Self::Err> {
1614                let (top_level, infix, idx) = derived_from_str(input)?;
1615                Ok(DelayId {
1616                    top_level,
1617                    infix,
1618                    idx,
1619                })
1620            }
1621        }
1622
1623        #[cfg(any(test, feature = "test"))]
1624        impl<'a> arbitrary::Arbitrary<'a> for DelayId {
1625            fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1626                use super::{ExecutionId, JoinSetId};
1627                let execution_id = ExecutionId::arbitrary(u)?;
1628                let mut join_set_id = JoinSetId::arbitrary(u)?;
1629                join_set_id.kind = crate::JoinSetKind::OneOff;
1630                Ok(DelayId::new(&execution_id, &join_set_id))
1631            }
1632        }
1633    }
1634}
1635
1636#[derive(
1637    Debug,
1638    Clone,
1639    PartialEq,
1640    Eq,
1641    Hash,
1642    derive_more::Display,
1643    serde_with::SerializeDisplay,
1644    serde_with::DeserializeFromStr,
1645)]
1646#[non_exhaustive] // force using the constructor as much as possible due to validation
1647#[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1648pub struct JoinSetId {
1649    pub kind: JoinSetKind,
1650    pub name: StrVariant,
1651}
1652
1653#[derive(
1654    Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, Default,
1655)]
1656#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1657#[serde(rename_all = "snake_case")]
1658pub enum ClosingStrategy {
1659    /// All submitted child execution requests that were not awaited by the workflow are awaited during join set close.
1660    /// Delay requests are not awaited.
1661    #[default]
1662    Complete,
1663}
1664
1665impl JoinSetId {
1666    pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1667        Ok(Self {
1668            kind,
1669            name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1670        })
1671    }
1672}
1673
1674pub const CHARSET_ALPHANUMERIC: &str =
1675    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1676
1677#[derive(
1678    Debug,
1679    Clone,
1680    Copy,
1681    PartialEq,
1682    Eq,
1683    Hash,
1684    derive_more::Display,
1685    Serialize,
1686    Deserialize,
1687    strum::EnumIter,
1688)]
1689#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1690#[display("{}", self.as_code())]
1691pub enum JoinSetKind {
1692    OneOff,
1693    Named,
1694    Generated,
1695}
1696impl JoinSetKind {
1697    fn as_code(&self) -> &'static str {
1698        match self {
1699            JoinSetKind::OneOff => "o",
1700            JoinSetKind::Named => "n",
1701            JoinSetKind::Generated => "g",
1702        }
1703    }
1704}
1705impl FromStr for JoinSetKind {
1706    type Err = &'static str;
1707    fn from_str(s: &str) -> Result<Self, Self::Err> {
1708        use strum::IntoEnumIterator;
1709        Self::iter()
1710            .find(|variant| s == variant.as_code())
1711            .ok_or("unknown join set kind")
1712    }
1713}
1714
1715pub const JOIN_SET_ID_INFIX: char = ':';
1716const CHARSET_EXTRA_JSON_SET: &str = "_-/";
1717
1718impl FromStr for JoinSetId {
1719    type Err = JoinSetIdParseError;
1720
1721    fn from_str(input: &str) -> Result<Self, Self::Err> {
1722        let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1723            return Err(JoinSetIdParseError::WrongParts);
1724        };
1725        let kind = kind
1726            .parse()
1727            .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1728        Ok(JoinSetId::new(kind, StrVariant::from(name.to_string()))?)
1729    }
1730}
1731
1732#[derive(Debug, thiserror::Error)]
1733pub enum JoinSetIdParseError {
1734    #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1735    WrongParts,
1736    #[error("cannot parse join set id's execution id - {0}")]
1737    ExecutionIdParseError(#[from] ExecutionIdParseError),
1738    #[error("cannot parse join set kind - {0}")]
1739    JoinSetKindParseError(&'static str),
1740    #[error("cannot parse join set id - {0}")]
1741    InvalidName(#[from] InvalidNameError<JoinSetId>),
1742}
1743
1744#[cfg(any(test, feature = "test"))]
1745const CHARSET_JOIN_SET_NAME: &str =
1746    const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1747#[cfg(any(test, feature = "test"))]
1748impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
1749    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1750        let name: String = {
1751            let length_inclusive = u.int_in_range(0..=10).unwrap();
1752            (0..=length_inclusive)
1753                .map(|_| {
1754                    let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1755                    CHARSET_JOIN_SET_NAME
1756                        .chars()
1757                        .nth(idx)
1758                        .expect("idx is < charset.len()")
1759                })
1760                .collect()
1761        };
1762
1763        Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
1764    }
1765}
1766
1767#[derive(
1768    Debug,
1769    Clone,
1770    Copy,
1771    strum::Display,
1772    PartialEq,
1773    Eq,
1774    strum::EnumString,
1775    Hash,
1776    serde_with::SerializeDisplay,
1777    serde_with::DeserializeFromStr,
1778)]
1779#[strum(serialize_all = "snake_case")]
1780pub enum ComponentType {
1781    ActivityWasm,
1782    ActivityStub,
1783    Workflow,
1784    WebhookEndpoint,
1785}
1786
1787#[derive(
1788    derive_more::Debug,
1789    Clone,
1790    PartialEq,
1791    Eq,
1792    Hash,
1793    serde_with::SerializeDisplay,
1794    serde_with::DeserializeFromStr,
1795    derive_more::Display,
1796)]
1797#[display("{component_type}:{name}")]
1798#[debug("{}", self)]
1799#[non_exhaustive] // force using the constructor as much as possible due to validation
1800pub struct ComponentId {
1801    pub component_type: ComponentType,
1802    pub name: StrVariant,
1803}
1804impl ComponentId {
1805    pub fn new(
1806        component_type: ComponentType,
1807        name: StrVariant,
1808    ) -> Result<Self, InvalidNameError<Self>> {
1809        Ok(Self {
1810            component_type,
1811            name: check_name(name, "_")?,
1812        })
1813    }
1814
1815    #[must_use]
1816    pub const fn dummy_activity() -> Self {
1817        Self {
1818            component_type: ComponentType::ActivityWasm,
1819            name: StrVariant::empty(),
1820        }
1821    }
1822
1823    #[must_use]
1824    pub const fn dummy_workflow() -> ComponentId {
1825        ComponentId {
1826            component_type: ComponentType::Workflow,
1827            name: StrVariant::empty(),
1828        }
1829    }
1830}
1831
1832pub fn check_name<T>(
1833    name: StrVariant,
1834    special: &'static str,
1835) -> Result<StrVariant, InvalidNameError<T>> {
1836    if let Some(invalid) = name
1837        .as_ref()
1838        .chars()
1839        .find(|c| !c.is_ascii_alphanumeric() && !special.contains(*c))
1840    {
1841        Err(InvalidNameError::<T> {
1842            invalid,
1843            name: name.as_ref().to_string(),
1844            special,
1845            phantom_data: PhantomData,
1846        })
1847    } else {
1848        Ok(name)
1849    }
1850}
1851#[derive(Debug, thiserror::Error)]
1852#[error(
1853    "name of {} `{name}` contains invalid character `{invalid}`, must only contain alphanumeric characters and following characters {special}",
1854    std::any::type_name::<T>().rsplit("::").next().unwrap()
1855)]
1856pub struct InvalidNameError<T> {
1857    invalid: char,
1858    name: String,
1859    special: &'static str,
1860    phantom_data: PhantomData<T>,
1861}
1862
1863#[derive(Debug, thiserror::Error)]
1864pub enum ConfigIdParseError {
1865    #[error("cannot parse ComponentConfigHash - delimiter ':' not found")]
1866    DelimiterNotFound,
1867    #[error("cannot parse prefix of ComponentConfigHash - {0}")]
1868    ComponentTypeParseError(#[from] strum::ParseError),
1869}
1870
1871impl FromStr for ComponentId {
1872    type Err = ConfigIdParseError;
1873
1874    fn from_str(input: &str) -> Result<Self, Self::Err> {
1875        let (component_type, name) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1876        let component_type = component_type.parse()?;
1877        Ok(Self {
1878            component_type,
1879            name: StrVariant::from(name.to_string()),
1880        })
1881    }
1882}
1883
1884#[derive(
1885    Debug,
1886    Clone,
1887    Copy,
1888    strum::Display,
1889    strum::EnumString,
1890    PartialEq,
1891    Eq,
1892    Hash,
1893    serde_with::SerializeDisplay,
1894    serde_with::DeserializeFromStr,
1895)]
1896#[strum(serialize_all = "snake_case")]
1897pub enum HashType {
1898    Sha256,
1899}
1900
1901#[derive(
1902    Debug,
1903    Clone,
1904    derive_more::Display,
1905    derive_more::FromStr,
1906    derive_more::Deref,
1907    PartialEq,
1908    Eq,
1909    Hash,
1910    serde_with::SerializeDisplay,
1911    serde_with::DeserializeFromStr,
1912)]
1913pub struct ContentDigest(pub Digest);
1914#[cfg(any(test, feature = "test"))]
1915pub const CONTENT_DIGEST_DUMMY: ContentDigest = ContentDigest(Digest {
1916    hash_type: HashType::Sha256,
1917    hash_base16: StrVariant::empty(),
1918});
1919
1920impl ContentDigest {
1921    #[must_use]
1922    pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1923        Self(Digest::new(hash_type, hash_base16))
1924    }
1925}
1926
1927#[derive(
1928    Debug,
1929    Clone,
1930    derive_more::Display,
1931    PartialEq,
1932    Eq,
1933    Hash,
1934    serde_with::SerializeDisplay,
1935    serde_with::DeserializeFromStr,
1936)]
1937#[display("{hash_type}:{hash_base16}")]
1938pub struct Digest {
1939    hash_type: HashType,
1940    hash_base16: StrVariant,
1941}
1942impl Digest {
1943    #[must_use]
1944    pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1945        Self {
1946            hash_type,
1947            hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1948        }
1949    }
1950
1951    #[must_use]
1952    pub fn hash_type(&self) -> HashType {
1953        self.hash_type
1954    }
1955
1956    #[must_use]
1957    pub fn digest_base16(&self) -> &str {
1958        &self.hash_base16
1959    }
1960}
1961
1962#[derive(Debug, thiserror::Error)]
1963pub enum DigestParseErrror {
1964    #[error("cannot parse ContentDigest - delimiter ':' not found")]
1965    DelimiterNotFound,
1966    #[error("cannot parse ContentDigest - invalid prefix `{hash_type}`")]
1967    TypeParseError { hash_type: String },
1968    #[error("cannot parse ContentDigest - invalid suffix length, expected 64 hex digits, got {0}")]
1969    SuffixLength(usize),
1970    #[error("cannot parse ContentDigest - suffix must be hex-encoded, got invalid character `{0}`")]
1971    SuffixInvalid(char),
1972}
1973
1974impl FromStr for Digest {
1975    type Err = DigestParseErrror;
1976
1977    fn from_str(input: &str) -> Result<Self, Self::Err> {
1978        let (hash_type, hash_base16) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1979        let hash_type =
1980            HashType::from_str(hash_type).map_err(|_err| Self::Err::TypeParseError {
1981                hash_type: hash_type.to_string(),
1982            })?;
1983        if hash_base16.len() != 64 {
1984            return Err(Self::Err::SuffixLength(hash_base16.len()));
1985        }
1986        if let Some(invalid) = hash_base16.chars().find(|c| !c.is_ascii_hexdigit()) {
1987            return Err(Self::Err::SuffixInvalid(invalid));
1988        }
1989        Ok(Self {
1990            hash_type,
1991            hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1992        })
1993    }
1994}
1995
1996const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
1997#[derive(
1998    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1999)]
2000pub enum ReturnType {
2001    Extendable(ReturnTypeExtendable), // Execution failures can be converted to this return type, e.g. result<_, string>
2002    NonExtendable(ReturnTypeNonExtendable), // e.g. -submit returns ExecutionId
2003}
2004impl ReturnType {
2005    /// Evaluate whether the return type is one of supported types:
2006    /// * `result`
2007    /// * `result<T>`
2008    /// * `result<_, string>`
2009    /// * `result<T, string>`
2010    /// * `result<T, E>` where T can be `_` and E is a `variant` containing `execution-failed`
2011    ///   variant with no associated value.
2012    #[must_use]
2013    pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
2014        if let TypeWrapper::Result { ok, err: None } = type_wrapper {
2015            return ReturnType::Extendable(ReturnTypeExtendable {
2016                type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
2017                wit_type,
2018            });
2019        } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
2020            if let TypeWrapper::String = err.as_ref() {
2021                return ReturnType::Extendable(ReturnTypeExtendable {
2022                    type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
2023                    wit_type,
2024                });
2025            } else if let TypeWrapper::Variant(fields) = err.as_ref()
2026                && let Some(None) = fields.get(EXECUTION_FAILED_STRING_OR_VARIANT)
2027            {
2028                return ReturnType::Extendable(ReturnTypeExtendable {
2029                    type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
2030                    wit_type,
2031                });
2032            }
2033            return ReturnType::NonExtendable(ReturnTypeNonExtendable {
2034                type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
2035                wit_type,
2036            });
2037        }
2038        ReturnType::NonExtendable(ReturnTypeNonExtendable {
2039            type_wrapper: type_wrapper.clone(),
2040            wit_type,
2041        })
2042    }
2043
2044    #[must_use]
2045    pub fn wit_type(&self) -> &str {
2046        match self {
2047            ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
2048            ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
2049        }
2050    }
2051
2052    #[must_use]
2053    pub fn type_wrapper(&self) -> TypeWrapper {
2054        match self {
2055            ReturnType::Extendable(compatible) => {
2056                TypeWrapper::from(compatible.type_wrapper_tl.clone())
2057            }
2058            ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
2059        }
2060    }
2061}
2062
2063#[derive(
2064    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2065)]
2066#[display("{wit_type}")]
2067pub struct ReturnTypeNonExtendable {
2068    pub type_wrapper: TypeWrapper,
2069    pub wit_type: StrVariant,
2070}
2071
2072#[derive(
2073    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2074)]
2075#[display("{wit_type}")]
2076pub struct ReturnTypeExtendable {
2077    pub type_wrapper_tl: TypeWrapperTopLevel,
2078    pub wit_type: StrVariant,
2079}
2080
2081#[cfg(any(test, feature = "test"))]
2082pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2083    type_wrapper_tl: TypeWrapperTopLevel {
2084        ok: None,
2085        err: None,
2086    },
2087    wit_type: StrVariant::Static("result"),
2088});
2089
2090#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2091#[derive_where::derive_where(PartialEq)]
2092#[display("{name}: {wit_type}")]
2093pub struct ParameterType {
2094    pub type_wrapper: TypeWrapper,
2095    #[derive_where(skip)]
2096    // Names are read from how a component names the parameter and thus might differ between export and import.
2097    pub name: StrVariant,
2098    pub wit_type: StrVariant,
2099}
2100
2101#[derive(
2102    Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2103)]
2104pub struct ParameterTypes(pub Vec<ParameterType>);
2105
2106impl Debug for ParameterTypes {
2107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2108        write!(f, "(")?;
2109        let mut iter = self.0.iter().peekable();
2110        while let Some(p) = iter.next() {
2111            write!(f, "{p:?}")?;
2112            if iter.peek().is_some() {
2113                write!(f, ", ")?;
2114            }
2115        }
2116        write!(f, ")")
2117    }
2118}
2119
2120impl Display for ParameterTypes {
2121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2122        write!(f, "(")?;
2123        let mut iter = self.0.iter().peekable();
2124        while let Some(p) = iter.next() {
2125            write!(f, "{p}")?;
2126            if iter.peek().is_some() {
2127                write!(f, ", ")?;
2128            }
2129        }
2130        write!(f, ")")
2131    }
2132}
2133
2134#[derive(Debug, Clone)]
2135pub struct PackageIfcFns {
2136    pub ifc_fqn: IfcFqnName,
2137    pub extension: bool, // one of `-obelisk-ext`, `-obelisk-schedule`, `-obelisk-stub`
2138    pub fns: IndexMap<FnName, FunctionMetadata>,
2139}
2140
2141#[derive(Debug, Clone, Copy)]
2142pub struct ComponentRetryConfig {
2143    pub max_retries: u32,
2144    pub retry_exp_backoff: Duration,
2145}
2146impl ComponentRetryConfig {
2147    pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2148        max_retries: 0,
2149        retry_exp_backoff: Duration::ZERO,
2150    };
2151}
2152
2153/// Implementation must not return `-obelisk-*` extended function, nor functions from `obelisk` namespace.
2154pub trait FunctionRegistry: Send + Sync {
2155    fn get_by_exported_function(
2156        &self,
2157        ffqn: &FunctionFqn,
2158    ) -> Option<(FunctionMetadata, ComponentId, ComponentRetryConfig)>;
2159
2160    // TODO: return Option<&TypeWrapperTopLevel>, optimize
2161    /// Get return type of a non-ext function, otherwise return `None`.
2162    fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2163        self.get_by_exported_function(ffqn)
2164            .and_then(|(fn_meta, _, _)| {
2165                if let ReturnType::Extendable(ReturnTypeExtendable {
2166                    type_wrapper_tl: type_wrapper,
2167                    wit_type: _,
2168                }) = fn_meta.return_type
2169                {
2170                    Some(type_wrapper)
2171                } else {
2172                    None
2173                }
2174            })
2175    }
2176
2177    fn all_exports(&self) -> &[PackageIfcFns];
2178}
2179
2180#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
2181#[display("{_0:?}")]
2182pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
2183
2184impl ExecutionMetadata {
2185    const LINKED_KEY: &str = "obelisk-tracing-linked";
2186    #[must_use]
2187    pub const fn empty() -> Self {
2188        // Remove `Optional` when const hashmap creation is allowed - https://github.com/rust-lang/rust/issues/123197
2189        Self(None)
2190    }
2191
2192    #[must_use]
2193    pub fn from_parent_span(less_specific: &Span) -> Self {
2194        ExecutionMetadata::create(less_specific, false)
2195    }
2196
2197    #[must_use]
2198    pub fn from_linked_span(less_specific: &Span) -> Self {
2199        ExecutionMetadata::create(less_specific, true)
2200    }
2201
2202    /// Attempt to use `Span::current()` to fill the trace and parent span.
2203    /// If that fails, which can happen due to interference with e.g.
2204    /// the stdout layer of the subscriber, use the `span` which is guaranteed
2205    /// to be on info level.
2206    #[must_use]
2207    #[expect(clippy::items_after_statements)]
2208    fn create(span: &Span, link_marker: bool) -> Self {
2209        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2210        let mut metadata = Self(Some(hashbrown::HashMap::default()));
2211        let mut metadata_view = ExecutionMetadataInjectorView {
2212            metadata: &mut metadata,
2213        };
2214        // inject the current context through the amqp headers
2215        fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2216            opentelemetry::global::get_text_map_propagator(|propagator| {
2217                propagator.inject_context(&s.context(), metadata_view);
2218            });
2219        }
2220        inject(&Span::current(), &mut metadata_view);
2221        if metadata_view.is_empty() {
2222            // The subscriber sent us a current span that is actually disabled
2223            inject(span, &mut metadata_view);
2224        }
2225        if link_marker {
2226            metadata_view.set(Self::LINKED_KEY, String::new());
2227        }
2228        metadata
2229    }
2230
2231    pub fn enrich(&self, span: &Span) {
2232        use opentelemetry::trace::TraceContextExt as _;
2233        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2234
2235        let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2236        let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2237            propagator.extract(&metadata_view)
2238        });
2239        if metadata_view.get(Self::LINKED_KEY).is_some() {
2240            let linked_span_context = otel_context.span().span_context().clone();
2241            span.add_link(linked_span_context);
2242        } else {
2243            span.set_parent(otel_context);
2244        }
2245    }
2246}
2247
2248struct ExecutionMetadataInjectorView<'a> {
2249    metadata: &'a mut ExecutionMetadata,
2250}
2251
2252impl ExecutionMetadataInjectorView<'_> {
2253    fn is_empty(&self) -> bool {
2254        self.metadata
2255            .0
2256            .as_ref()
2257            .is_some_and(hashbrown::HashMap::is_empty)
2258    }
2259}
2260
2261impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2262    fn set(&mut self, key: &str, value: String) {
2263        let key = format!("tracing:{key}");
2264        let map = if let Some(map) = self.metadata.0.as_mut() {
2265            map
2266        } else {
2267            self.metadata.0 = Some(hashbrown::HashMap::new());
2268            assert_matches!(&mut self.metadata.0, Some(map) => map)
2269        };
2270        map.insert(key, value);
2271    }
2272}
2273
2274struct ExecutionMetadataExtractorView<'a> {
2275    metadata: &'a ExecutionMetadata,
2276}
2277
2278impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2279    fn get(&self, key: &str) -> Option<&str> {
2280        self.metadata
2281            .0
2282            .as_ref()
2283            .and_then(|map| map.get(&format!("tracing:{key}")))
2284            .map(std::string::String::as_str)
2285    }
2286
2287    fn keys(&self) -> Vec<&str> {
2288        match &self.metadata.0.as_ref() {
2289            Some(map) => map
2290                .keys()
2291                .filter_map(|key| key.strip_prefix("tracing:"))
2292                .collect(),
2293            None => vec![],
2294        }
2295    }
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300
2301    use rstest::rstest;
2302
2303    use crate::{
2304        ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2305    };
2306    use std::{
2307        hash::{DefaultHasher, Hash, Hasher},
2308        str::FromStr,
2309        sync::Arc,
2310    };
2311
2312    #[test]
2313    fn ulid_parsing() {
2314        let generated = ExecutorId::generate();
2315        let str = generated.to_string();
2316        let parsed = str.parse().unwrap();
2317        assert_eq!(generated, parsed);
2318    }
2319
2320    #[test]
2321    fn execution_id_parsing_top_level() {
2322        let generated = ExecutionId::generate();
2323        let str = generated.to_string();
2324        let parsed = str.parse().unwrap();
2325        assert_eq!(generated, parsed);
2326    }
2327
2328    #[test]
2329    fn execution_id_with_one_level_should_parse() {
2330        let top_level = ExecutionId::generate();
2331        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2332        let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2333        let ser = first_child.to_string();
2334        assert_eq!(format!("{top_level}.n:name_1"), ser);
2335        let parsed = ExecutionId::from_str(&ser).unwrap();
2336        assert_eq!(first_child, parsed);
2337    }
2338
2339    #[test]
2340    fn execution_id_increment_twice() {
2341        let top_level = ExecutionId::generate();
2342        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2343        let first_child = top_level.next_level(&join_set_id);
2344        let second_child = ExecutionId::Derived(first_child.get_incremented());
2345        let ser = second_child.to_string();
2346        assert_eq!(format!("{top_level}.n:name_2"), ser);
2347        let parsed = ExecutionId::from_str(&ser).unwrap();
2348        assert_eq!(second_child, parsed);
2349    }
2350
2351    #[test]
2352    fn execution_id_next_level_twice() {
2353        let top_level = ExecutionId::generate();
2354        let join_set_id_outer =
2355            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2356        let join_set_id_inner =
2357            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2358        let execution_id = ExecutionId::Derived(
2359            top_level
2360                .next_level(&join_set_id_outer)
2361                .get_incremented()
2362                .next_level(&join_set_id_inner)
2363                .get_incremented(),
2364        );
2365        let ser = execution_id.to_string();
2366        assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2367        let parsed = ExecutionId::from_str(&ser).unwrap();
2368        assert_eq!(execution_id, parsed);
2369    }
2370
2371    #[test]
2372    fn execution_id_split_first_level() {
2373        let top_level = ExecutionId::generate();
2374        let join_set_id =
2375            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2376        let execution_id = top_level.next_level(&join_set_id);
2377        let (actual_top_level, actual_join_set) = execution_id.split_to_parts().unwrap();
2378        assert_eq!(top_level, actual_top_level);
2379        assert_eq!(join_set_id, actual_join_set);
2380    }
2381
2382    #[rstest]
2383    fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2384        let top_level = ExecutionId::generate();
2385        let join_set_id_outer =
2386            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2387        let first_level = top_level
2388            .next_level(&join_set_id_outer)
2389            .get_incremented_by(outer_idx);
2390
2391        let join_set_id_inner =
2392            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2393        let second_level = first_level.next_level(&join_set_id_inner);
2394
2395        let (actual_first_level, actual_join_set) = second_level.split_to_parts().unwrap();
2396        assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2397        assert_eq!(join_set_id_inner, actual_join_set);
2398    }
2399
2400    #[test]
2401    fn execution_id_hash_should_be_stable() {
2402        let parent = ExecutionId::from_parts(1, 2);
2403        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2404        let sibling_1 = parent.next_level(&join_set_id);
2405        let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2406        let sibling_1 = ExecutionId::Derived(sibling_1);
2407        let join_set_id_inner =
2408            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2409        let child =
2410            ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2411        let parent = parent.random_seed();
2412        let sibling_1 = sibling_1.random_seed();
2413        let sibling_2 = sibling_2.random_seed();
2414        let child = child.random_seed();
2415        let vec = vec![parent, sibling_1, sibling_2, child];
2416        insta::assert_debug_snapshot!(vec);
2417        // check that every hash is unique
2418        let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2419        assert_eq!(4, set.len());
2420    }
2421
2422    #[test]
2423    fn hash_of_str_variants_should_be_equal() {
2424        let input = "foo";
2425        let left = StrVariant::Arc(Arc::from(input));
2426        let right = StrVariant::Static(input);
2427        assert_eq!(left, right);
2428        let mut left_hasher = DefaultHasher::new();
2429        left.hash(&mut left_hasher);
2430        let mut right_hasher = DefaultHasher::new();
2431        right.hash(&mut right_hasher);
2432        let left_hasher = left_hasher.finish();
2433        let right_hasher = right_hasher.finish();
2434        println!("left: {left_hasher:x}, right: {right_hasher:x}");
2435        assert_eq!(left_hasher, right_hasher);
2436    }
2437
2438    #[test]
2439    fn ffqn_from_tuple_with_version_should_work() {
2440        let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2441        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2442    }
2443
2444    #[test]
2445    fn ffqn_from_str_with_version_should_work() {
2446        let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2447        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2448    }
2449
2450    #[tokio::test]
2451    async fn join_set_serde_should_be_consistent() {
2452        use crate::{JoinSetId, JoinSetKind};
2453        use strum::IntoEnumIterator;
2454        for kind in JoinSetKind::iter() {
2455            let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2456            let ser = serde_json::to_string(&join_set_id).unwrap();
2457            let deser = serde_json::from_str(&ser).unwrap();
2458            assert_eq!(join_set_id, deser);
2459        }
2460    }
2461}