obeli_sk_concepts/
lib.rs

1pub mod component_id;
2mod error_conversions;
3#[cfg(feature = "postgres")]
4mod postgres_ext;
5#[cfg(feature = "rusqlite")]
6mod rusqlite_ext;
7pub mod storage;
8pub mod time;
9
10pub use crate::component_id::{
11    ComponentId, ComponentType, ContentDigest, InvalidNameError, check_name,
12};
13use ::serde::{Deserialize, Serialize};
14use assert_matches::assert_matches;
15pub use indexmap;
16use indexmap::IndexMap;
17use opentelemetry::propagation::{Extractor, Injector};
18pub use prefixed_ulid::ExecutionId;
19use serde_json::Value;
20use std::{
21    borrow::Borrow,
22    fmt::{Debug, Display, Write},
23    hash::Hash,
24    marker::PhantomData,
25    ops::Deref,
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
31use tracing::{Span, error};
32use val_json::{
33    type_wrapper::{TypeConversionError, TypeWrapper},
34    wast_val::{WastVal, WastValWithType},
35    wast_val_ser::params,
36};
37use wasmtime::component::{Type, Val};
38
39pub const NAMESPACE_OBELISK: &str = "obelisk";
40const NAMESPACE_WASI: &str = "wasi";
41pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
42pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
43pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
44
45#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
46#[error("execution error: {kind}")]
47pub struct FinishedExecutionError {
48    pub kind: ExecutionFailureKind,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub reason: Option<String>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub detail: Option<String>,
53}
54impl FinishedExecutionError {
55    #[must_use]
56    pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
57        PendingStateFinishedError::ExecutionFailure(self.kind)
58    }
59}
60
61#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum ExecutionFailureKind {
64    /// Applicable to activities only, because workflows will be retried forever
65    TimedOut,
66    /// Applicable to workflows
67    NondeterminismDetected,
68    /// Applicable to WASM components
69    OutOfFuel,
70    /// Applicable to activities
71    Cancelled,
72    Uncategorized,
73}
74
75#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(rename_all = "snake_case")]
77pub enum TrapKind {
78    #[display("trap")]
79    Trap,
80    #[display("post_return_trap")]
81    PostReturnTrap,
82    #[display("out of fuel")]
83    OutOfFuel,
84    #[display("host function error")]
85    HostFunctionError,
86}
87
88#[derive(Clone, Eq, derive_more::Display)]
89pub enum StrVariant {
90    Static(&'static str),
91    Arc(Arc<str>),
92}
93
94impl StrVariant {
95    #[must_use]
96    pub const fn empty() -> StrVariant {
97        StrVariant::Static("")
98    }
99}
100
101impl From<String> for StrVariant {
102    fn from(value: String) -> Self {
103        StrVariant::Arc(Arc::from(value))
104    }
105}
106
107impl From<&'static str> for StrVariant {
108    fn from(value: &'static str) -> Self {
109        StrVariant::Static(value)
110    }
111}
112
113impl PartialEq for StrVariant {
114    fn eq(&self, other: &Self) -> bool {
115        match (self, other) {
116            (Self::Static(left), Self::Static(right)) => left == right,
117            (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
118            (Self::Arc(left), Self::Arc(right)) => left == right,
119            (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
120        }
121    }
122}
123
124impl Hash for StrVariant {
125    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
126        match self {
127            StrVariant::Static(val) => val.hash(state),
128            StrVariant::Arc(val) => {
129                let str: &str = val.deref();
130                str.hash(state);
131            }
132        }
133    }
134}
135
136impl Debug for StrVariant {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        Display::fmt(self, f)
139    }
140}
141
142impl Deref for StrVariant {
143    type Target = str;
144    fn deref(&self) -> &Self::Target {
145        match self {
146            Self::Arc(v) => v,
147            Self::Static(v) => v,
148        }
149    }
150}
151
152impl AsRef<str> for StrVariant {
153    fn as_ref(&self) -> &str {
154        match self {
155            Self::Arc(v) => v,
156            Self::Static(v) => v,
157        }
158    }
159}
160
161mod serde_strvariant {
162    use crate::StrVariant;
163    use serde::{
164        Deserialize, Deserializer, Serialize, Serializer,
165        de::{self, Visitor},
166    };
167    use std::{ops::Deref, sync::Arc};
168
169    impl Serialize for StrVariant {
170        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
171        where
172            S: Serializer,
173        {
174            serializer.serialize_str(self.deref())
175        }
176    }
177
178    impl<'de> Deserialize<'de> for StrVariant {
179        fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
180        where
181            D: Deserializer<'de>,
182        {
183            deserializer.deserialize_str(StrVariantVisitor)
184        }
185    }
186
187    struct StrVariantVisitor;
188
189    impl Visitor<'_> for StrVariantVisitor {
190        type Value = StrVariant;
191
192        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
193            formatter.write_str("a string")
194        }
195
196        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
197        where
198            E: de::Error,
199        {
200            Ok(StrVariant::Arc(Arc::from(v)))
201        }
202    }
203}
204
205#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
206#[display("{value}")]
207#[serde(transparent)]
208pub struct Name<T> {
209    pub value: StrVariant,
210    #[serde(skip)]
211    phantom_data: PhantomData<fn(T) -> T>,
212}
213
214impl<T> Name<T> {
215    #[must_use]
216    pub fn new_arc(value: Arc<str>) -> Self {
217        Self {
218            value: StrVariant::Arc(value),
219            phantom_data: PhantomData,
220        }
221    }
222
223    #[must_use]
224    pub const fn new_static(value: &'static str) -> Self {
225        Self {
226            value: StrVariant::Static(value),
227            phantom_data: PhantomData,
228        }
229    }
230}
231
232impl<T> Debug for Name<T> {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        Display::fmt(&self, f)
235    }
236}
237
238impl<T> Deref for Name<T> {
239    type Target = str;
240
241    fn deref(&self) -> &Self::Target {
242        self.value.deref()
243    }
244}
245
246impl<T> Borrow<str> for Name<T> {
247    fn borrow(&self) -> &str {
248        self.deref()
249    }
250}
251
252impl<T> From<String> for Name<T> {
253    fn from(value: String) -> Self {
254        Self::new_arc(Arc::from(value))
255    }
256}
257
258#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
259#[display("{}", self.suffix())]
260pub enum PackageExtension {
261    ObeliskExt,
262    ObeliskSchedule,
263    ObeliskStub,
264}
265impl PackageExtension {
266    fn suffix(&self) -> &'static str {
267        match self {
268            PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
269            PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
270            PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
271        }
272    }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
276#[cfg_attr(feature = "test", derive(Serialize))]
277pub struct PkgFqn {
278    pub namespace: String, // TODO: StrVariant or reference
279    pub package_name: String,
280    pub version: Option<String>,
281}
282impl PkgFqn {
283    #[must_use]
284    pub fn is_extension(&self) -> bool {
285        Self::is_package_name_ext(&self.package_name)
286    }
287
288    #[must_use]
289    pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
290        use strum::IntoEnumIterator;
291        for package_ext in PackageExtension::iter() {
292            if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
293                return Some((
294                    PkgFqn {
295                        namespace: self.namespace.clone(),
296                        package_name: package_name.to_string(),
297                        version: self.version.clone(),
298                    },
299                    package_ext,
300                ));
301            }
302        }
303        None
304    }
305
306    fn is_package_name_ext(package_name: &str) -> bool {
307        package_name.ends_with(SUFFIX_PKG_EXT)
308            || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
309            || package_name.ends_with(SUFFIX_PKG_STUB)
310    }
311
312    fn fmt(&self, f: &mut dyn Write, delimiter: &'static str) -> std::fmt::Result {
313        let PkgFqn {
314            namespace,
315            package_name,
316            version,
317        } = self;
318        if let Some(version) = version {
319            write!(f, "{namespace}{delimiter}{package_name}@{version}")
320        } else {
321            write!(f, "{namespace}{delimiter}{package_name}")
322        }
323    }
324
325    #[must_use]
326    pub fn as_file_name(&self) -> String {
327        let mut buffer = String::new();
328        self.fmt(&mut buffer, "_").expect("writing to string");
329        buffer
330    }
331}
332impl Display for PkgFqn {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        self.fmt(f, ":")
335    }
336}
337
338#[derive(Hash, Clone, PartialEq, Eq)]
339pub struct IfcFqnMarker;
340
341pub type IfcFqnName = Name<IfcFqnMarker>; // namespace:name/ifc_name OR namespace:name/ifc_name@version
342
343impl IfcFqnName {
344    #[must_use]
345    pub fn from_parts(
346        namespace: &str,
347        package_name: &str,
348        ifc_name: &str,
349        version: Option<&str>,
350    ) -> Self {
351        let mut str = format!("{namespace}:{package_name}/{ifc_name}");
352        if let Some(version) = version {
353            str += "@";
354            str += version;
355        }
356        Self::new_arc(Arc::from(str))
357    }
358
359    #[must_use]
360    pub fn from_pkg_fqn(pkg_fqn: &PkgFqn, ifc_name: &str) -> Self {
361        let mut str = format!(
362            "{namespace}:{package_name}/{ifc_name}",
363            namespace = pkg_fqn.namespace,
364            package_name = pkg_fqn.package_name
365        );
366        if let Some(version) = &pkg_fqn.version {
367            str += "@";
368            str += version;
369        }
370        Self::new_arc(Arc::from(str))
371    }
372
373    #[must_use]
374    pub fn namespace(&self) -> &str {
375        self.deref().split_once(':').unwrap().0
376    }
377
378    #[must_use]
379    pub fn package_name(&self) -> &str {
380        let after_colon = self.deref().split_once(':').unwrap().1;
381        after_colon.split_once('/').unwrap().0
382    }
383
384    #[must_use]
385    pub fn version(&self) -> Option<&str> {
386        self.deref().split_once('@').map(|(_, version)| version)
387    }
388
389    #[must_use]
390    pub fn pkg_fqn_name(&self) -> PkgFqn {
391        let (namespace, rest) = self.deref().split_once(':').unwrap();
392        let (package_name, rest) = rest.split_once('/').unwrap();
393        let version = rest.split_once('@').map(|(_, version)| version);
394        PkgFqn {
395            namespace: namespace.to_string(),
396            package_name: package_name.to_string(),
397            version: version.map(std::string::ToString::to_string),
398        }
399    }
400
401    #[must_use]
402    pub fn ifc_name(&self) -> &str {
403        let after_colon = self.deref().split_once(':').unwrap().1;
404        let after_slash = after_colon.split_once('/').unwrap().1;
405        after_slash
406            .split_once('@')
407            .map_or(after_slash, |(ifc, _)| ifc)
408    }
409
410    #[must_use]
411    /// Returns true if this is an `-obelisk-*` extension interface.
412    pub fn is_extension(&self) -> bool {
413        PkgFqn::is_package_name_ext(self.package_name())
414    }
415
416    #[must_use]
417    pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
418        self.package_name().strip_suffix(SUFFIX_PKG_EXT)
419    }
420
421    #[must_use]
422    pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
423        self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
424    }
425
426    #[must_use]
427    pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
428        self.package_name().strip_suffix(SUFFIX_PKG_STUB)
429    }
430
431    #[must_use]
432    pub fn is_namespace_obelisk(&self) -> bool {
433        self.namespace() == NAMESPACE_OBELISK
434    }
435
436    #[must_use]
437    pub fn is_namespace_wasi(&self) -> bool {
438        self.namespace() == NAMESPACE_WASI
439    }
440}
441
442#[derive(Hash, Clone, PartialEq, Eq)]
443pub struct FnMarker;
444
445pub type FnName = Name<FnMarker>;
446
447#[derive(
448    Hash, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
449)]
450pub struct FunctionFqn {
451    pub ifc_fqn: IfcFqnName,
452    pub function_name: FnName,
453}
454
455impl FunctionFqn {
456    #[must_use]
457    pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
458        Self {
459            ifc_fqn: Name::new_arc(ifc_fqn),
460            function_name: Name::new_arc(function_name),
461        }
462    }
463
464    #[must_use]
465    pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
466        Self {
467            ifc_fqn: Name::new_static(ifc_fqn),
468            function_name: Name::new_static(function_name),
469        }
470    }
471
472    #[must_use]
473    pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
474        Self::new_static(tuple.0, tuple.1)
475    }
476
477    pub fn try_from_tuple(
478        ifc_fqn: &str,
479        function_name: &str,
480    ) -> Result<Self, FunctionFqnParseError> {
481        if function_name.contains('.') {
482            Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
483        } else {
484            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
485        }
486    }
487}
488
489#[derive(Debug, thiserror::Error)]
490pub enum FunctionFqnParseError {
491    #[error("delimiter `.` not found")]
492    DelimiterNotFound,
493    #[error("delimiter `.` found in function name")]
494    DelimiterFoundInFunctionName,
495}
496
497impl FromStr for FunctionFqn {
498    type Err = FunctionFqnParseError;
499
500    fn from_str(s: &str) -> Result<Self, Self::Err> {
501        if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
502            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
503        } else {
504            Err(FunctionFqnParseError::DelimiterNotFound)
505        }
506    }
507}
508
509impl Display for FunctionFqn {
510    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511        write!(
512            f,
513            "{ifc_fqn}.{function_name}",
514            ifc_fqn = self.ifc_fqn,
515            function_name = self.function_name
516        )
517    }
518}
519
520impl Debug for FunctionFqn {
521    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522        Display::fmt(&self, f)
523    }
524}
525
526#[cfg(any(test, feature = "test"))]
527impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
528    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
529        let illegal = [':', '@', '.'];
530        let namespace = u.arbitrary::<String>()?.replace(illegal, "");
531        let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
532        let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
533        let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
534
535        Ok(FunctionFqn::new_arc(
536            Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
537            Arc::from(fn_name),
538        ))
539    }
540}
541
542#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
543pub struct TypeWrapperTopLevel {
544    pub ok: Option<Box<TypeWrapper>>,
545    pub err: Option<Box<TypeWrapper>>,
546}
547impl From<TypeWrapperTopLevel> for TypeWrapper {
548    fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
549        TypeWrapper::Result {
550            ok: value.ok,
551            err: value.err,
552        }
553    }
554}
555
556#[derive(Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize)]
557#[serde(rename_all = "snake_case")]
558pub enum SupportedFunctionReturnValue {
559    Ok {
560        #[debug(skip)]
561        ok: Option<WastValWithType>,
562    },
563    Err {
564        #[debug(skip)]
565        err: Option<WastValWithType>,
566    },
567    ExecutionError(FinishedExecutionError),
568}
569impl Display for SupportedFunctionReturnValue {
570    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571        match self.as_pending_state_finished_result() {
572            PendingStateFinishedResultKind::Ok => write!(f, "execution completed successfully"),
573            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
574        }
575    }
576}
577
578pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
579    SupportedFunctionReturnValue::Ok { ok: None };
580
581#[derive(Debug, thiserror::Error)]
582pub enum ResultParsingError {
583    #[error("return value must not be empty")]
584    NoValue,
585    #[error("return value cannot be parsed, multi-value results are not supported")]
586    MultiValue,
587    #[error("return value cannot be parsed, {0}")]
588    TypeConversionError(val_json::type_wrapper::TypeConversionError),
589    #[error(transparent)]
590    ResultParsingErrorFromVal(ResultParsingErrorFromVal),
591}
592
593#[derive(Debug, thiserror::Error)]
594pub enum ResultParsingErrorFromVal {
595    #[error("return value cannot be parsed, {0}")]
596    WastValConversionError(val_json::wast_val::WastValConversionError),
597    #[error("top level type must be a result")]
598    TopLevelTypeMustBeAResult,
599    #[error("value does not type check")]
600    TypeCheckError,
601}
602
603impl SupportedFunctionReturnValue {
604    pub fn new<
605        I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
606    >(
607        mut iter: I,
608    ) -> Result<Self, ResultParsingError> {
609        if iter.len() == 0 {
610            Err(ResultParsingError::NoValue)
611        } else if iter.len() == 1 {
612            let (val, r#type) = iter.next().unwrap();
613            let r#type =
614                TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
615            Self::from_val_and_type_wrapper(val, r#type)
616                .map_err(ResultParsingError::ResultParsingErrorFromVal)
617        } else {
618            Err(ResultParsingError::MultiValue)
619        }
620    }
621
622    #[expect(clippy::result_unit_err)]
623    pub fn from_wast_val_with_type(
624        value: WastValWithType,
625    ) -> Result<SupportedFunctionReturnValue, ()> {
626        match value {
627            WastValWithType {
628                r#type: TypeWrapper::Result { ok: None, err: _ },
629                value: WastVal::Result(Ok(None)),
630            } => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
631            WastValWithType {
632                r#type:
633                    TypeWrapper::Result {
634                        ok: Some(ok),
635                        err: _,
636                    },
637                value: WastVal::Result(Ok(Some(value))),
638            } => Ok(SupportedFunctionReturnValue::Ok {
639                ok: Some(WastValWithType {
640                    r#type: *ok,
641                    value: *value,
642                }),
643            }),
644            WastValWithType {
645                r#type: TypeWrapper::Result { ok: _, err: None },
646                value: WastVal::Result(Err(None)),
647            } => Ok(SupportedFunctionReturnValue::Err { err: None }),
648            WastValWithType {
649                r#type:
650                    TypeWrapper::Result {
651                        ok: _,
652                        err: Some(err),
653                    },
654                value: WastVal::Result(Err(Some(value))),
655            } => Ok(SupportedFunctionReturnValue::Err {
656                err: Some(WastValWithType {
657                    r#type: *err,
658                    value: *value,
659                }),
660            }),
661            _ => Err(()),
662        }
663    }
664
665    pub fn from_val_and_type_wrapper(
666        value: wasmtime::component::Val,
667        ty: TypeWrapper,
668    ) -> Result<Self, ResultParsingErrorFromVal> {
669        let TypeWrapper::Result { ok, err } = ty else {
670            return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
671        };
672        let ty = TypeWrapperTopLevel { ok, err };
673        Self::from_val_and_type_wrapper_tl(value, ty)
674    }
675
676    pub fn from_val_and_type_wrapper_tl(
677        value: wasmtime::component::Val,
678        ty: TypeWrapperTopLevel,
679    ) -> Result<Self, ResultParsingErrorFromVal> {
680        let wasmtime::component::Val::Result(value) = value else {
681            return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
682        };
683
684        match (ty.ok, ty.err, value) {
685            (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
686            (Some(ok_type), _, Ok(Some(value))) => Ok(SupportedFunctionReturnValue::Ok {
687                ok: Some(WastValWithType {
688                    r#type: *ok_type,
689                    value: WastVal::try_from(*value)
690                        .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
691                }),
692            }),
693            (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err { err: None }),
694            (_, Some(err_type), Err(Some(value))) => Ok(SupportedFunctionReturnValue::Err {
695                err: Some(WastValWithType {
696                    r#type: *err_type,
697                    value: WastVal::try_from(*value)
698                        .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
699                }),
700            }),
701            _other => Err(ResultParsingErrorFromVal::TypeCheckError),
702        }
703    }
704
705    #[must_use]
706    pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
707        match self {
708            SupportedFunctionReturnValue::Ok { ok: None } => WastVal::Result(Ok(None)),
709            SupportedFunctionReturnValue::Ok { ok: Some(v) } => {
710                WastVal::Result(Ok(Some(Box::new(v.value))))
711            }
712            SupportedFunctionReturnValue::Err { err: None } => WastVal::Result(Err(None)),
713            SupportedFunctionReturnValue::Err { err: Some(v) } => {
714                WastVal::Result(Err(Some(Box::new(v.value))))
715            }
716            SupportedFunctionReturnValue::ExecutionError(_) => {
717                execution_error_to_wast_val(&get_return_type())
718            }
719        }
720    }
721
722    #[must_use]
723    pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
724        match self {
725            SupportedFunctionReturnValue::Ok { ok: _ } => PendingStateFinishedResultKind::Ok,
726            SupportedFunctionReturnValue::Err { err: _ } => {
727                PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error)
728            }
729            SupportedFunctionReturnValue::ExecutionError(err) => {
730                PendingStateFinishedResultKind::Err(err.as_pending_state_finished_error())
731            }
732        }
733    }
734}
735
736#[must_use]
737pub fn execution_error_to_wast_val(ret_type: &TypeWrapperTopLevel) -> WastVal {
738    match ret_type {
739        TypeWrapperTopLevel { ok: _, err: None } => return WastVal::Result(Err(None)),
740        TypeWrapperTopLevel {
741            ok: _,
742            err: Some(inner),
743        } => match inner.as_ref() {
744            TypeWrapper::String => {
745                return WastVal::Result(Err(Some(Box::new(WastVal::String(
746                    EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
747                )))));
748            }
749            TypeWrapper::Variant(variants) => {
750                if variants.get(EXECUTION_FAILED_STRING_OR_VARIANT) == Some(&None) {
751                    return WastVal::Result(Err(Some(Box::new(WastVal::Variant(
752                        EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
753                        None,
754                    )))));
755                }
756            }
757            _ => {}
758        },
759    }
760    unreachable!("unexpected top-level return type {ret_type:?} cannot be ReturnTypeCompatible")
761}
762
763#[derive(Debug, Clone)]
764pub struct Params(ParamsInternal);
765
766#[derive(derive_more::Debug, Clone)]
767enum ParamsInternal {
768    JsonValues(Arc<[Value]>),
769    Vals {
770        vals: Arc<[wasmtime::component::Val]>,
771        #[debug(skip)]
772        json_vals_cache: Arc<std::sync::RwLock<Option<Arc<[Value]>>>>, // Caches json values
773    },
774    Empty,
775}
776
777pub const SUFFIX_FN_SUBMIT: &str = "-submit";
778pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
779pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
780pub const SUFFIX_FN_STUB: &str = "-stub";
781pub const SUFFIX_FN_GET: &str = "-get";
782
783#[derive(
784    Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, strum::EnumIter,
785)]
786#[serde(rename_all = "snake_case")]
787pub enum FunctionExtension {
788    Submit,
789    AwaitNext,
790    Schedule,
791    Stub,
792    Get,
793}
794impl FunctionExtension {
795    #[must_use]
796    pub fn suffix(&self) -> &'static str {
797        match self {
798            FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
799            FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
800            FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
801            FunctionExtension::Stub => SUFFIX_FN_STUB,
802            FunctionExtension::Get => SUFFIX_FN_GET,
803        }
804    }
805
806    #[must_use]
807    pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
808        matches!(
809            (pkg_ext, self),
810            (
811                PackageExtension::ObeliskExt,
812                FunctionExtension::Submit | FunctionExtension::AwaitNext | FunctionExtension::Get
813            ) | (
814                PackageExtension::ObeliskSchedule,
815                FunctionExtension::Schedule
816            ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
817        )
818    }
819}
820
821#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
822pub struct FunctionMetadata {
823    pub ffqn: FunctionFqn,
824    pub parameter_types: ParameterTypes,
825    pub return_type: ReturnType,
826    pub extension: Option<FunctionExtension>,
827    /// Externally submittable: primary functions + `-schedule` extended, but no activity stubs
828    pub submittable: bool,
829}
830impl FunctionMetadata {
831    #[must_use]
832    pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
833        self.extension.map(|extension| {
834            let prefix = self
835                .ffqn
836                .function_name
837                .value
838                .strip_suffix(extension.suffix())
839                .unwrap_or_else(|| {
840                    panic!(
841                        "extension function {} must end with expected suffix {}",
842                        self.ffqn.function_name,
843                        extension.suffix()
844                    )
845                });
846            (prefix, extension)
847        })
848    }
849}
850impl Display for FunctionMetadata {
851    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
852        write!(
853            f,
854            "{ffqn}: func{params} -> {return_type}",
855            ffqn = self.ffqn,
856            params = self.parameter_types,
857            return_type = self.return_type,
858        )
859    }
860}
861
862pub mod serde_params {
863    use crate::{Params, ParamsInternal};
864    use serde::de::{SeqAccess, Visitor};
865    use serde::ser::SerializeSeq;
866    use serde::{Deserialize, Serialize};
867    use serde_json::Value;
868    use std::sync::Arc;
869    use val_json::wast_val::WastVal;
870
871    impl Serialize for Params {
872        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
873        where
874            S: ::serde::Serializer,
875        {
876            let serialize_json_values = |slice: &[serde_json::Value], serializer: S| {
877                let mut seq = serializer.serialize_seq(Some(slice.len()))?;
878                for item in slice {
879                    seq.serialize_element(item)?;
880                }
881                seq.end()
882            };
883            match &self.0 {
884                ParamsInternal::Vals {
885                    vals,
886                    json_vals_cache,
887                } => {
888                    let guard = json_vals_cache.read().unwrap();
889                    if let Some(slice) = &*guard {
890                        serialize_json_values(slice, serializer)
891                    } else {
892                        drop(guard);
893                        let mut json_vals = Vec::with_capacity(vals.len());
894                        for val in vals.iter() {
895                            let value = WastVal::try_from(val.clone())
896                                .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
897                            let value = serde_json::to_value(&value)
898                                .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
899                            json_vals.push(value);
900                        }
901                        let res = serialize_json_values(&json_vals, serializer);
902                        *json_vals_cache.write().unwrap() = Some(Arc::from(json_vals));
903                        res
904                    }
905                }
906                ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
907                ParamsInternal::JsonValues(vec) => serialize_json_values(vec, serializer),
908            }
909        }
910    }
911
912    pub struct VecVisitor;
913
914    impl<'de> Visitor<'de> for VecVisitor {
915        type Value = Vec<Value>;
916
917        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
918            formatter.write_str("a sequence of `Value`")
919        }
920
921        #[inline]
922        fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
923        where
924            V: SeqAccess<'de>,
925        {
926            let mut vec = Vec::new();
927            while let Some(elem) = visitor.next_element()? {
928                vec.push(elem);
929            }
930            Ok(vec)
931        }
932    }
933
934    impl<'de> Deserialize<'de> for Params {
935        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
936        where
937            D: serde::Deserializer<'de>,
938        {
939            let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
940            if vec.is_empty() {
941                Ok(Self(ParamsInternal::Empty))
942            } else {
943                Ok(Self(ParamsInternal::JsonValues(Arc::from(vec))))
944            }
945        }
946    }
947}
948
949#[derive(Debug, thiserror::Error)]
950pub enum ParamsParsingError {
951    #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
952    ParameterTypeError {
953        idx: usize,
954        err: TypeConversionError,
955    },
956    #[error("parameters cannot be deserialized: {0}")]
957    ParamsDeserializationError(serde_json::Error),
958    #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
959    ParameterCardinalityMismatch { expected: usize, specified: usize },
960}
961
962impl ParamsParsingError {
963    #[must_use]
964    pub fn detail(&self) -> Option<String> {
965        match self {
966            ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
967            ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
968            ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
969        }
970    }
971}
972
973#[derive(Debug, thiserror::Error)]
974pub enum ParamsFromJsonError {
975    #[error("value must be a json array containing function parameters")]
976    MustBeArray,
977}
978
979impl Params {
980    #[must_use]
981    pub const fn empty() -> Self {
982        Self(ParamsInternal::Empty)
983    }
984
985    #[must_use]
986    pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
987        if vals.is_empty() {
988            Self::empty()
989        } else {
990            Self(ParamsInternal::Vals {
991                vals,
992                json_vals_cache: Arc::default(),
993            })
994        }
995    }
996
997    #[cfg(any(test, feature = "test"))]
998    #[must_use]
999    pub fn from_json_values_test(vec: Vec<Value>) -> Self {
1000        if vec.is_empty() {
1001            Self::empty()
1002        } else {
1003            Self(ParamsInternal::JsonValues(Arc::from(vec)))
1004        }
1005    }
1006
1007    #[must_use]
1008    pub fn from_json_values(values: Arc<[Value]>) -> Self {
1009        if values.is_empty() {
1010            Self::empty()
1011        } else {
1012            Self(ParamsInternal::JsonValues(values))
1013        }
1014    }
1015
1016    pub fn typecheck<'a>(
1017        &self,
1018        param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
1019    ) -> Result<(), ParamsParsingError> {
1020        if param_types.len() != self.len() {
1021            return Err(ParamsParsingError::ParameterCardinalityMismatch {
1022                expected: param_types.len(),
1023                specified: self.len(),
1024            });
1025        }
1026        match &self.0 {
1027            ParamsInternal::Vals { .. } /* already typechecked */ | ParamsInternal::Empty => {}
1028            ParamsInternal::JsonValues(params) => {
1029                params::deserialize_values(params, param_types)
1030                .map_err(ParamsParsingError::ParamsDeserializationError)?;
1031            }
1032        }
1033        Ok(())
1034    }
1035
1036    pub fn as_vals<'a>(
1037        &self,
1038        param_types: impl ExactSizeIterator<Item = (&'a str, Type)>,
1039    ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1040        if param_types.len() != self.len() {
1041            return Err(ParamsParsingError::ParameterCardinalityMismatch {
1042                expected: param_types.len(),
1043                specified: self.len(),
1044            });
1045        }
1046        match &self.0 {
1047            ParamsInternal::JsonValues(json_vec) => {
1048                let param_types = param_types
1049                    .enumerate()
1050                    .map(|(idx, (_param_name, ty))| {
1051                        TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1052                    })
1053                    .collect::<Result<Vec<_>, _>>()
1054                    .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1055                Ok(params::deserialize_values(json_vec, param_types.iter())
1056                    .map_err(ParamsParsingError::ParamsDeserializationError)?
1057                    .into_iter()
1058                    .map(Val::from)
1059                    .collect())
1060            }
1061            ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1062            ParamsInternal::Empty => Ok(Arc::from([])),
1063        }
1064    }
1065
1066    #[must_use]
1067    pub fn len(&self) -> usize {
1068        match &self.0 {
1069            ParamsInternal::JsonValues(vec) => vec.len(),
1070            ParamsInternal::Vals { vals, .. } => vals.len(),
1071            ParamsInternal::Empty => 0,
1072        }
1073    }
1074
1075    #[must_use]
1076    pub fn is_empty(&self) -> bool {
1077        self.len() == 0
1078    }
1079}
1080
1081impl PartialEq for Params {
1082    fn eq(&self, other: &Self) -> bool {
1083        if self.is_empty() && other.is_empty() {
1084            return true;
1085        }
1086        if self.len() != other.len() {
1087            return false;
1088        }
1089
1090        match (&self.0, &other.0) {
1091            (ParamsInternal::JsonValues(l), ParamsInternal::JsonValues(r)) => l == r,
1092
1093            (
1094                ParamsInternal::Vals {
1095                    vals: l,
1096                    json_vals_cache: _,
1097                },
1098                ParamsInternal::Vals {
1099                    vals: r,
1100                    json_vals_cache: _,
1101                },
1102            ) => l == r,
1103
1104            (
1105                ParamsInternal::JsonValues(json_vals),
1106                ParamsInternal::Vals {
1107                    vals,
1108                    json_vals_cache,
1109                },
1110            )
1111            | (
1112                ParamsInternal::Vals {
1113                    vals,
1114                    json_vals_cache,
1115                },
1116                ParamsInternal::JsonValues(json_vals),
1117            ) => {
1118                let Ok(vec) = to_json(vals) else { return false };
1119                let equals = *json_vals == vec;
1120                *json_vals_cache.write().unwrap() = Some(vec);
1121                equals
1122            }
1123
1124            (ParamsInternal::Empty, _) | (_, ParamsInternal::Empty) => {
1125                unreachable!("zero length and different lengths handled earlier")
1126            }
1127        }
1128    }
1129}
1130impl Eq for Params {}
1131
1132/// Convert wasmtime's Vals to serde's Values
1133fn to_json(vals: &[wasmtime::component::Val]) -> Result<Arc<[serde_json::Value]>, ()> {
1134    let mut vec = Vec::with_capacity(vals.len());
1135    for val in vals {
1136        let value = match WastVal::try_from(val.clone()) {
1137            Ok(ok) => ok,
1138            Err(err) => {
1139                error!("cannot compare Params, cannot convert to WastVal: {err:?}");
1140                return Err(());
1141            }
1142        };
1143        let value = match serde_json::to_value(&value) {
1144            Ok(ok) => ok,
1145            Err(err) => {
1146                error!("cannot compare Params, cannot convert to JSON: {err:?}");
1147                return Err(());
1148            }
1149        };
1150        vec.push(value);
1151    }
1152    Ok(Arc::from(vec))
1153}
1154
1155impl Display for Params {
1156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1157        let render_json_vals =
1158            |f: &mut std::fmt::Formatter<'_>, json_vals: &[Value]| -> std::fmt::Result {
1159                for (i, json_value) in json_vals.iter().enumerate() {
1160                    if i > 0 {
1161                        write!(f, ", ")?;
1162                    }
1163                    write!(f, "{json_value}")?;
1164                }
1165                Ok(())
1166            };
1167        write!(f, "[")?;
1168        match &self.0 {
1169            ParamsInternal::Empty => {}
1170            ParamsInternal::JsonValues(json_vals) => {
1171                render_json_vals(f, json_vals)?;
1172            }
1173            ParamsInternal::Vals {
1174                vals,
1175                json_vals_cache,
1176            } => {
1177                let guard = json_vals_cache.read().unwrap();
1178                if let Some(json_vals) = &*guard {
1179                    render_json_vals(f, json_vals)?;
1180                } else {
1181                    drop(guard);
1182                    let Ok(json_vals) = to_json(vals) else {
1183                        return write!(f, "<serialization error>]"); // note trailing ]
1184                    };
1185                    render_json_vals(f, &json_vals)?;
1186                    *json_vals_cache.write().unwrap() = Some(json_vals);
1187                }
1188            }
1189        }
1190        write!(f, "]")
1191    }
1192}
1193
1194pub mod prefixed_ulid {
1195    use crate::{
1196        EXECUTION_ID_INFIX, EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX, JoinSetId,
1197        JoinSetIdParseError,
1198    };
1199    use serde_with::{DeserializeFromStr, SerializeDisplay};
1200    use std::{
1201        fmt::{Debug, Display},
1202        hash::Hasher,
1203        marker::PhantomData,
1204        num::ParseIntError,
1205        str::FromStr,
1206        sync::Arc,
1207    };
1208    use ulid::Ulid;
1209
1210    #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
1211    #[derive_where::derive_where(Clone, Copy)]
1212    #[display("{}_{ulid}", Self::prefix())]
1213    pub struct PrefixedUlid<T: 'static> {
1214        ulid: Ulid,
1215        phantom_data: PhantomData<fn(T) -> T>,
1216    }
1217
1218    impl<T> PrefixedUlid<T> {
1219        const fn new(ulid: Ulid) -> Self {
1220            Self {
1221                ulid,
1222                phantom_data: PhantomData,
1223            }
1224        }
1225
1226        fn prefix() -> &'static str {
1227            std::any::type_name::<T>().rsplit("::").next().unwrap()
1228        }
1229    }
1230
1231    impl<T> PrefixedUlid<T> {
1232        #[must_use]
1233        pub fn generate() -> Self {
1234            Self::new(Ulid::new())
1235        }
1236
1237        #[must_use]
1238        pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1239            Self::new(Ulid::from_parts(timestamp_ms, random))
1240        }
1241
1242        #[must_use]
1243        pub fn timestamp_part(&self) -> u64 {
1244            self.ulid.timestamp_ms()
1245        }
1246
1247        #[must_use]
1248        /// Fills only the lower 80 bits of the returned 128-bit value.
1249        pub fn random_part(&self) -> u128 {
1250            self.ulid.random()
1251        }
1252    }
1253
1254    #[derive(Debug, thiserror::Error)]
1255    pub enum PrefixedUlidParseError {
1256        #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1257        WrongPrefix { input: String, expected: String },
1258        #[error("cannot parse ULID suffix from `{input}`")]
1259        CannotParseUlid { input: String },
1260    }
1261
1262    mod impls {
1263        use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1264        use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1265
1266        impl<T> FromStr for PrefixedUlid<T> {
1267            type Err = PrefixedUlidParseError;
1268
1269            fn from_str(input: &str) -> Result<Self, Self::Err> {
1270                let prefix = Self::prefix();
1271                let mut input_chars = input.chars();
1272                for exp in prefix.chars() {
1273                    if input_chars.next() != Some(exp) {
1274                        return Err(PrefixedUlidParseError::WrongPrefix {
1275                            input: input.to_string(),
1276                            expected: format!("{prefix}_"),
1277                        });
1278                    }
1279                }
1280                if input_chars.next() != Some('_') {
1281                    return Err(PrefixedUlidParseError::WrongPrefix {
1282                        input: input.to_string(),
1283                        expected: format!("{prefix}_"),
1284                    });
1285                }
1286                let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1287                    return Err(PrefixedUlidParseError::CannotParseUlid {
1288                        input: input.to_string(),
1289                    });
1290                };
1291                Ok(Self {
1292                    ulid,
1293                    phantom_data: PhantomData,
1294                })
1295            }
1296        }
1297
1298        impl<T> Debug for PrefixedUlid<T> {
1299            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1300                Display::fmt(&self, f)
1301            }
1302        }
1303
1304        impl<T> Hash for PrefixedUlid<T> {
1305            fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1306                Self::prefix().hash(state);
1307                self.ulid.hash(state);
1308                self.phantom_data.hash(state);
1309            }
1310        }
1311
1312        impl<T> PartialEq for PrefixedUlid<T> {
1313            fn eq(&self, other: &Self) -> bool {
1314                self.ulid == other.ulid
1315            }
1316        }
1317
1318        impl<T> Eq for PrefixedUlid<T> {}
1319
1320        impl<T> PartialOrd for PrefixedUlid<T> {
1321            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1322                Some(self.cmp(other))
1323            }
1324        }
1325
1326        impl<T> Ord for PrefixedUlid<T> {
1327            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1328                self.ulid.cmp(&other.ulid)
1329            }
1330        }
1331    }
1332
1333    pub mod prefix {
1334        pub struct E;
1335        pub struct Exr;
1336        pub struct Run;
1337        pub struct Delay;
1338    }
1339
1340    pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1341    pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1342    pub type RunId = PrefixedUlid<prefix::Run>;
1343    pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; // Never used directly, tracking top level ExecutionId
1344
1345    #[cfg(any(test, feature = "test"))]
1346    impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1347        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1348            Ok(Self::new(ulid::Ulid::from_parts(
1349                u.arbitrary()?,
1350                u.arbitrary()?,
1351            )))
1352        }
1353    }
1354
1355    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1356    pub enum ExecutionId {
1357        TopLevel(ExecutionIdTopLevel),
1358        Derived(ExecutionIdDerived),
1359    }
1360
1361    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1362    pub struct ExecutionIdDerived {
1363        top_level: ExecutionIdTopLevel,
1364        infix: Arc<str>,
1365        idx: u64,
1366    }
1367    impl ExecutionIdDerived {
1368        #[must_use]
1369        pub fn get_incremented(&self) -> Self {
1370            self.get_incremented_by(1)
1371        }
1372        #[must_use]
1373        pub fn get_incremented_by(&self, count: u64) -> Self {
1374            ExecutionIdDerived {
1375                top_level: self.top_level,
1376                infix: self.infix.clone(),
1377                idx: self.idx + count,
1378            }
1379        }
1380        #[must_use]
1381        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1382            let ExecutionIdDerived {
1383                top_level,
1384                infix,
1385                idx,
1386            } = self;
1387            let infix = Arc::from(format!(
1388                "{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1389            ));
1390            ExecutionIdDerived {
1391                top_level: *top_level,
1392                infix,
1393                idx: EXECUTION_ID_START_IDX,
1394            }
1395        }
1396        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1397            let ExecutionIdDerived {
1398                top_level,
1399                infix,
1400                idx,
1401            } = self;
1402            write!(
1403                f,
1404                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1405            )
1406        }
1407
1408        #[must_use]
1409        pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1410            self.split_to_parts_res().expect("verified in from_str")
1411        }
1412
1413        fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1414            // Two cases:
1415            // A. infix contains a `.` => infix must be split into old_infix _ old_idx . JoinSetId
1416            // B. else => child of a top level, will be split into the top level, the whole infix must be JoinSetId.
1417            if let Some((old_infix_and_index, join_set_id)) =
1418                self.infix.rsplit_once(EXECUTION_ID_INFIX)
1419            {
1420                let join_set_id = JoinSetId::from_str(join_set_id)
1421                    .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1422                let Some((old_infix, old_idx)) =
1423                    old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1424                else {
1425                    return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1426                };
1427                let parent = ExecutionIdDerived {
1428                    top_level: self.top_level,
1429                    infix: Arc::from(old_infix),
1430                    idx: old_idx
1431                        .parse()
1432                        .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1433                };
1434                Ok((ExecutionId::Derived(parent), join_set_id))
1435            } else {
1436                Ok((
1437                    ExecutionId::TopLevel(self.top_level),
1438                    JoinSetId::from_str(&self.infix)
1439                        .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1440                ))
1441            }
1442        }
1443    }
1444
1445    #[derive(Debug, thiserror::Error)]
1446    pub enum DerivedIdSplitError {
1447        #[error(transparent)]
1448        JoinSetIdParseError(JoinSetIdParseError),
1449        #[error("cannot parse index of parent execution - {0}")]
1450        CannotParseOldIndex(ParseIntError),
1451        #[error("cannot find join set delimiter")]
1452        CannotFindJoinSetDelimiter,
1453    }
1454
1455    impl Debug for ExecutionIdDerived {
1456        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1457            self.display_or_debug(f)
1458        }
1459    }
1460    impl Display for ExecutionIdDerived {
1461        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462            self.display_or_debug(f)
1463        }
1464    }
1465    impl FromStr for ExecutionIdDerived {
1466        type Err = DerivedIdParseError;
1467
1468        fn from_str(input: &str) -> Result<Self, Self::Err> {
1469            let (top_level, infix, idx) = derived_from_str(input)?;
1470            let id = ExecutionIdDerived {
1471                top_level,
1472                infix,
1473                idx,
1474            };
1475            Ok(id)
1476        }
1477    }
1478
1479    fn derived_from_str<T: 'static>(
1480        input: &str,
1481    ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1482        // Isolate the Root ID (Prefix) from the Hierarchical Path (Suffix)
1483        let (prefix, full_suffix) = input
1484            .split_once(EXECUTION_ID_INFIX)
1485            .ok_or(DerivedIdParseError::FirstDelimiterNotFound)?;
1486
1487        // Parse the Root ULID
1488        let top_level =
1489            PrefixedUlid::from_str(prefix).map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1490
1491        // Iterate through every segment to ensure the full path is valid.
1492        let mut last_idx = None;
1493        for segment in full_suffix.split(EXECUTION_ID_INFIX) {
1494            if segment.is_empty() {
1495                return Err(DerivedIdParseError::EmptySegment);
1496            }
1497            // Split the segment into JoinSetId and Index
1498            let (join_set_str, idx_str) = segment
1499                .split_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1500                .ok_or(DerivedIdParseError::SecondDelimiterNotFound)?;
1501            JoinSetId::from_str(join_set_str).map_err(DerivedIdParseError::JoinSetIdParseError)?;
1502            let idx = u64::from_str(idx_str).map_err(DerivedIdParseError::ParseIndexError)?;
1503            last_idx = Some((idx, idx_str));
1504        }
1505        let (idx, idx_str) = last_idx.expect("split must have returned at least one element");
1506        let infix = full_suffix
1507            .strip_suffix(idx_str)
1508            .expect("must have ended with index");
1509        let infix = infix
1510            .strip_suffix(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1511            .expect("must have ended with `_index`");
1512        Ok((top_level, Arc::from(infix), idx))
1513    }
1514
1515    #[cfg(any(test, feature = "test"))]
1516    impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1517        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1518            let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1519            let join_set_id = JoinSetId::arbitrary(u)?;
1520            Ok(top_level.next_level(&join_set_id))
1521        }
1522    }
1523
1524    #[derive(Debug, thiserror::Error)]
1525    pub enum DerivedIdParseError {
1526        #[error(transparent)]
1527        PrefixedUlidParseError(PrefixedUlidParseError),
1528        #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1529        FirstDelimiterNotFound,
1530        #[error(
1531            "cannot parse derived id - delimiter `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` not found"
1532        )]
1533        SecondDelimiterNotFound,
1534        #[error(
1535            "cannot parse derived id - suffix after `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` must be a number"
1536        )]
1537        ParseIndexError(ParseIntError),
1538        #[error(transparent)]
1539        DerivedIdSplitError(DerivedIdSplitError),
1540        #[error(transparent)]
1541        JoinSetIdParseError(JoinSetIdParseError),
1542        #[error("empty segment")]
1543        EmptySegment,
1544    }
1545
1546    impl ExecutionId {
1547        #[must_use]
1548        pub fn generate() -> Self {
1549            ExecutionId::TopLevel(PrefixedUlid::generate())
1550        }
1551
1552        #[must_use]
1553        pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1554            match &self {
1555                ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1556                ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1557            }
1558        }
1559
1560        #[must_use]
1561        pub fn is_top_level(&self) -> bool {
1562            matches!(self, ExecutionId::TopLevel(_))
1563        }
1564
1565        #[must_use]
1566        pub fn random_seed(&self) -> u64 {
1567            let mut hasher = fxhash::FxHasher::default();
1568            // `Self::random_part` uses only the lower 80 bits of a 128-bit value.
1569            // Truncate to 64 bits, since including the remaining 16 bits
1570            // would not increase the entropy of the 64-bit output.
1571            #[expect(clippy::cast_possible_truncation)]
1572            let random_part = self.get_top_level().random_part() as u64;
1573            hasher.write_u64(random_part);
1574            hasher.write_u64(self.get_top_level().timestamp_part());
1575            if let ExecutionId::Derived(ExecutionIdDerived {
1576                top_level: _,
1577                infix,
1578                idx,
1579            }) = self
1580            {
1581                // Each derived execution ID should return different seed.
1582                hasher.write(infix.as_bytes());
1583                hasher.write_u64(*idx);
1584            }
1585            hasher.finish()
1586        }
1587
1588        #[must_use]
1589        pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1590            ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1591        }
1592
1593        #[must_use]
1594        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1595            match &self {
1596                ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1597                    top_level: *top_level,
1598                    infix: Arc::from(join_set_id.to_string()),
1599                    idx: EXECUTION_ID_START_IDX,
1600                },
1601                ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1602            }
1603        }
1604
1605        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1606            match &self {
1607                ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1608                ExecutionId::Derived(derived) => Display::fmt(derived, f),
1609            }
1610        }
1611    }
1612
1613    const EXECUTION_ID_START_IDX: u64 = 1;
1614    pub const JOIN_SET_START_IDX: u64 = 1;
1615    const DELAY_ID_START_IDX: u64 = 1;
1616
1617    #[derive(Debug, thiserror::Error)]
1618    pub enum ExecutionIdParseError {
1619        #[error(transparent)]
1620        PrefixedUlidParseError(PrefixedUlidParseError),
1621        #[error(transparent)]
1622        DerivedIdParseError(DerivedIdParseError),
1623    }
1624
1625    impl FromStr for ExecutionId {
1626        type Err = ExecutionIdParseError;
1627
1628        fn from_str(input: &str) -> Result<Self, Self::Err> {
1629            if input.contains(EXECUTION_ID_INFIX) {
1630                ExecutionIdDerived::from_str(input)
1631                    .map(ExecutionId::Derived)
1632                    .map_err(ExecutionIdParseError::DerivedIdParseError)
1633            } else {
1634                Ok(ExecutionId::TopLevel(
1635                    PrefixedUlid::from_str(input)
1636                        .map_err(ExecutionIdParseError::PrefixedUlidParseError)?,
1637                ))
1638            }
1639        }
1640    }
1641
1642    #[derive(Debug, thiserror::Error)]
1643    pub enum ExecutionIdStructuralParseError {
1644        #[error(transparent)]
1645        ExecutionIdParseError(#[from] ExecutionIdParseError),
1646        #[error("execution-id must be a record with `id` field of type string")]
1647        TypeError,
1648    }
1649
1650    impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1651        type Error = ExecutionIdStructuralParseError;
1652
1653        fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1654            if let wasmtime::component::Val::Record(key_vals) = execution_id
1655                && key_vals.len() == 1
1656                && let Some((key, execution_id)) = key_vals.first()
1657                && key == "id"
1658                && let wasmtime::component::Val::String(execution_id) = execution_id
1659            {
1660                ExecutionId::from_str(execution_id)
1661                    .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1662            } else {
1663                Err(ExecutionIdStructuralParseError::TypeError)
1664            }
1665        }
1666    }
1667
1668    impl Debug for ExecutionId {
1669        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1670            self.display_or_debug(f)
1671        }
1672    }
1673
1674    impl Display for ExecutionId {
1675        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1676            self.display_or_debug(f)
1677        }
1678    }
1679
1680    #[cfg(any(test, feature = "test"))]
1681    impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1682        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1683            Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1684        }
1685    }
1686
1687    /// Mirrors [`ExecutionId`], with different prefix and `idx` for tracking each delay within the join set.
1688    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1689    pub struct DelayId {
1690        top_level: DelayIdTopLevel,
1691        infix: Arc<str>,
1692        idx: u64,
1693    }
1694    impl DelayId {
1695        #[must_use]
1696        pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1697            Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1698        }
1699
1700        #[must_use]
1701        pub fn new_with_index(
1702            execution_id: &ExecutionId,
1703            join_set_id: &JoinSetId,
1704            idx: u64,
1705        ) -> DelayId {
1706            let ExecutionIdDerived {
1707                top_level: PrefixedUlid { ulid, .. },
1708                infix,
1709                idx: _,
1710            } = execution_id.next_level(join_set_id);
1711            let top_level = DelayIdTopLevel::new(ulid);
1712            DelayId {
1713                top_level,
1714                infix,
1715                idx,
1716            }
1717        }
1718
1719        #[must_use]
1720        pub fn get_incremented(&self) -> Self {
1721            Self {
1722                top_level: self.top_level,
1723                infix: self.infix.clone(),
1724                idx: self.idx + 1,
1725            }
1726        }
1727
1728        #[must_use]
1729        pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1730            self.split_to_parts_res().expect("verified in from_str")
1731        }
1732
1733        fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1734            // Two cases:
1735            // A. infix contains a `.` => infix must be split into old_infix _ old_idx . JoinSetId
1736            // B. else => child of a top level, will be split into the top level, the whole infix must be JoinSetId.
1737            if let Some((old_infix_and_index, join_set_id)) =
1738                self.infix.rsplit_once(EXECUTION_ID_INFIX)
1739            {
1740                let join_set_id = JoinSetId::from_str(join_set_id)
1741                    .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1742                let Some((old_infix, old_idx)) =
1743                    old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1744                else {
1745                    return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1746                };
1747                let parent = ExecutionIdDerived {
1748                    top_level: ExecutionIdTopLevel::new(self.top_level.ulid),
1749                    infix: Arc::from(old_infix),
1750                    idx: old_idx
1751                        .parse()
1752                        .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1753                };
1754                Ok((ExecutionId::Derived(parent), join_set_id))
1755            } else {
1756                Ok((
1757                    ExecutionId::TopLevel(ExecutionIdTopLevel::new(self.top_level.ulid)),
1758                    JoinSetId::from_str(&self.infix)
1759                        .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1760                ))
1761            }
1762        }
1763
1764        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1765            let DelayId {
1766                top_level,
1767                infix,
1768                idx,
1769            } = self;
1770            write!(
1771                f,
1772                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1773            )
1774        }
1775    }
1776
1777    pub mod delay_impl {
1778        use super::{DelayId, DerivedIdParseError, derived_from_str};
1779        use std::{
1780            fmt::{Debug, Display},
1781            str::FromStr,
1782        };
1783
1784        impl Debug for DelayId {
1785            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1786                self.display_or_debug(f)
1787            }
1788        }
1789
1790        impl Display for DelayId {
1791            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1792                self.display_or_debug(f)
1793            }
1794        }
1795
1796        impl FromStr for DelayId {
1797            type Err = DerivedIdParseError;
1798
1799            fn from_str(input: &str) -> Result<Self, Self::Err> {
1800                let (top_level, infix, idx) = derived_from_str(input)?;
1801                let id = DelayId {
1802                    top_level,
1803                    infix,
1804                    idx,
1805                };
1806                Ok(id)
1807            }
1808        }
1809
1810        #[cfg(any(test, feature = "test"))]
1811        impl<'a> arbitrary::Arbitrary<'a> for DelayId {
1812            fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1813                use super::{ExecutionId, JoinSetId};
1814                let execution_id = ExecutionId::arbitrary(u)?;
1815                let mut join_set_id = JoinSetId::arbitrary(u)?;
1816                join_set_id.kind = crate::JoinSetKind::OneOff;
1817                Ok(DelayId::new(&execution_id, &join_set_id))
1818            }
1819        }
1820    }
1821}
1822
1823#[derive(
1824    Debug,
1825    Clone,
1826    PartialEq,
1827    Eq,
1828    Hash,
1829    derive_more::Display,
1830    serde_with::SerializeDisplay,
1831    serde_with::DeserializeFromStr,
1832)]
1833#[non_exhaustive] // force using the constructor as much as possible due to validation
1834#[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1835pub struct JoinSetId {
1836    pub kind: JoinSetKind,
1837    pub name: StrVariant,
1838}
1839
1840impl JoinSetId {
1841    pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1842        Ok(Self {
1843            kind,
1844            name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1845        })
1846    }
1847}
1848
1849pub const CHARSET_ALPHANUMERIC: &str =
1850    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1851
1852#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display, strum::EnumIter)]
1853#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1854#[display("{}", self.as_code())]
1855pub enum JoinSetKind {
1856    OneOff,
1857    Named,
1858    Generated,
1859}
1860impl JoinSetKind {
1861    fn as_code(&self) -> &'static str {
1862        match self {
1863            JoinSetKind::OneOff => "o",
1864            JoinSetKind::Named => "n",
1865            JoinSetKind::Generated => "g",
1866        }
1867    }
1868}
1869impl FromStr for JoinSetKind {
1870    type Err = &'static str;
1871    fn from_str(s: &str) -> Result<Self, Self::Err> {
1872        use strum::IntoEnumIterator;
1873        Self::iter()
1874            .find(|variant| s == variant.as_code())
1875            .ok_or("unknown join set kind")
1876    }
1877}
1878
1879pub(crate) const EXECUTION_ID_INFIX: char = '.';
1880pub(crate) const EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX: char = '_';
1881
1882const JOIN_SET_ID_INFIX: char = ':';
1883const CHARSET_EXTRA_JSON_SET: &str = "-/";
1884
1885impl FromStr for JoinSetId {
1886    type Err = JoinSetIdParseError;
1887
1888    fn from_str(input: &str) -> Result<Self, Self::Err> {
1889        let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1890            return Err(JoinSetIdParseError::WrongParts);
1891        };
1892        let kind = kind
1893            .parse()
1894            .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1895        JoinSetId::new(kind, StrVariant::from(name.to_string()))
1896            .map_err(JoinSetIdParseError::InvalidName)
1897    }
1898}
1899
1900#[derive(Debug, thiserror::Error)]
1901pub enum JoinSetIdParseError {
1902    #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1903    WrongParts,
1904    #[error("cannot parse join set kind - {0}")]
1905    JoinSetKindParseError(&'static str),
1906    #[error("cannot parse join set id - {0}")]
1907    InvalidName(InvalidNameError<JoinSetId>),
1908}
1909
1910#[cfg(any(test, feature = "test"))]
1911const CHARSET_JOIN_SET_NAME: &str =
1912    const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1913#[cfg(any(test, feature = "test"))]
1914impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
1915    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1916        let name: String = {
1917            let length_inclusive = u.int_in_range(0..=10).unwrap();
1918            (0..=length_inclusive)
1919                .map(|_| {
1920                    let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1921                    CHARSET_JOIN_SET_NAME
1922                        .chars()
1923                        .nth(idx)
1924                        .expect("idx is < charset.len()")
1925                })
1926                .collect()
1927        };
1928
1929        Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
1930    }
1931}
1932
1933const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
1934#[derive(
1935    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1936)]
1937#[serde(rename_all = "snake_case")]
1938pub enum ReturnType {
1939    Extendable(ReturnTypeExtendable), // Execution failures can be converted to this return type, e.g. result<_, string>
1940    NonExtendable(ReturnTypeNonExtendable), // e.g. -submit returns ExecutionId
1941}
1942impl ReturnType {
1943    /// Evaluate whether the return type is one of supported types:
1944    /// * `result`
1945    /// * `result<T>`
1946    /// * `result<_, string>`
1947    /// * `result<T, string>`
1948    /// * `result<T, E>` where T can be `_` and E is a `variant` containing `execution-failed`
1949    ///   variant with no associated value.
1950    #[must_use]
1951    pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
1952        if let TypeWrapper::Result { ok, err: None } = type_wrapper {
1953            return ReturnType::Extendable(ReturnTypeExtendable {
1954                type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
1955                wit_type,
1956            });
1957        } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
1958            if let TypeWrapper::String = err.as_ref() {
1959                return ReturnType::Extendable(ReturnTypeExtendable {
1960                    type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1961                    wit_type,
1962                });
1963            } else if let TypeWrapper::Variant(fields) = err.as_ref()
1964                && let Some(None) = fields.get(EXECUTION_FAILED_STRING_OR_VARIANT)
1965            {
1966                return ReturnType::Extendable(ReturnTypeExtendable {
1967                    type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1968                    wit_type,
1969                });
1970            }
1971            return ReturnType::NonExtendable(ReturnTypeNonExtendable {
1972                type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
1973                wit_type,
1974            });
1975        }
1976        ReturnType::NonExtendable(ReturnTypeNonExtendable {
1977            type_wrapper: type_wrapper.clone(),
1978            wit_type,
1979        })
1980    }
1981
1982    #[must_use]
1983    pub fn wit_type(&self) -> &str {
1984        match self {
1985            ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
1986            ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
1987        }
1988    }
1989
1990    #[must_use]
1991    pub fn type_wrapper(&self) -> TypeWrapper {
1992        match self {
1993            ReturnType::Extendable(compatible) => {
1994                TypeWrapper::from(compatible.type_wrapper_tl.clone())
1995            }
1996            ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
1997        }
1998    }
1999}
2000
2001#[derive(
2002    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2003)]
2004#[display("{wit_type}")]
2005pub struct ReturnTypeNonExtendable {
2006    pub type_wrapper: TypeWrapper,
2007    pub wit_type: StrVariant,
2008}
2009
2010#[derive(
2011    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2012)]
2013#[display("{wit_type}")]
2014pub struct ReturnTypeExtendable {
2015    pub type_wrapper_tl: TypeWrapperTopLevel,
2016    pub wit_type: StrVariant,
2017}
2018
2019#[cfg(any(test, feature = "test"))]
2020pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2021    type_wrapper_tl: TypeWrapperTopLevel {
2022        ok: None,
2023        err: None,
2024    },
2025    wit_type: StrVariant::Static("result"),
2026});
2027
2028#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2029#[derive_where::derive_where(PartialEq)]
2030#[display("{name}: {wit_type}")]
2031pub struct ParameterType {
2032    pub type_wrapper: TypeWrapper,
2033    #[derive_where(skip)]
2034    // Names are read from how a component names the parameter and thus might differ between export and import.
2035    pub name: StrVariant,
2036    pub wit_type: StrVariant,
2037}
2038
2039#[derive(
2040    Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2041)]
2042pub struct ParameterTypes(pub Vec<ParameterType>);
2043
2044impl Debug for ParameterTypes {
2045    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2046        write!(f, "(")?;
2047        let mut iter = self.0.iter().peekable();
2048        while let Some(p) = iter.next() {
2049            write!(f, "{p:?}")?;
2050            if iter.peek().is_some() {
2051                write!(f, ", ")?;
2052            }
2053        }
2054        write!(f, ")")
2055    }
2056}
2057
2058impl Display for ParameterTypes {
2059    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2060        write!(f, "(")?;
2061        let mut iter = self.0.iter().peekable();
2062        while let Some(p) = iter.next() {
2063            write!(f, "{p}")?;
2064            if iter.peek().is_some() {
2065                write!(f, ", ")?;
2066            }
2067        }
2068        write!(f, ")")
2069    }
2070}
2071
2072#[derive(Debug, Clone)]
2073pub struct PackageIfcFns {
2074    pub ifc_fqn: IfcFqnName,
2075    pub extension: bool, // one of `-obelisk-ext`, `-obelisk-schedule`, `-obelisk-stub`
2076    pub fns: IndexMap<FnName, FunctionMetadata>,
2077}
2078
2079#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2080pub struct ComponentRetryConfig {
2081    pub max_retries: Option<u32>,
2082    pub retry_exp_backoff: Duration,
2083}
2084impl ComponentRetryConfig {
2085    pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2086        max_retries: Some(0),
2087        retry_exp_backoff: Duration::ZERO,
2088    };
2089
2090    #[cfg(feature = "test")]
2091    pub const WORKFLOW_TEST: ComponentRetryConfig = ComponentRetryConfig {
2092        max_retries: None,
2093        retry_exp_backoff: Duration::ZERO,
2094    };
2095}
2096
2097/// Implementation must not return `-obelisk-*` extended function, nor functions from `obelisk` namespace.
2098pub trait FunctionRegistry: Send + Sync {
2099    fn get_by_exported_function(
2100        &self,
2101        ffqn: &FunctionFqn,
2102    ) -> Option<(FunctionMetadata, ComponentId)>;
2103
2104    // TODO: return Option<&TypeWrapperTopLevel>, optimize
2105    /// Get return type of a non-ext function, otherwise return `None`.
2106    fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2107        self.get_by_exported_function(ffqn)
2108            .and_then(|(fn_meta, _)| {
2109                if let ReturnType::Extendable(ReturnTypeExtendable {
2110                    type_wrapper_tl: type_wrapper,
2111                    wit_type: _,
2112                }) = fn_meta.return_type
2113                {
2114                    Some(type_wrapper)
2115                } else {
2116                    None
2117                }
2118            })
2119    }
2120
2121    fn all_exports(&self) -> &[PackageIfcFns];
2122}
2123
2124#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
2125#[display("{_0:?}")]
2126pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
2127
2128impl ExecutionMetadata {
2129    const LINKED_KEY: &str = "obelisk-tracing-linked";
2130    #[must_use]
2131    pub const fn empty() -> Self {
2132        // Remove `Optional` when const hashmap creation is allowed - https://github.com/rust-lang/rust/issues/123197
2133        Self(None)
2134    }
2135
2136    #[must_use]
2137    pub fn from_parent_span(less_specific: &Span) -> Self {
2138        ExecutionMetadata::create(less_specific, false)
2139    }
2140
2141    #[must_use]
2142    pub fn from_linked_span(less_specific: &Span) -> Self {
2143        ExecutionMetadata::create(less_specific, true)
2144    }
2145
2146    /// Attempt to use `Span::current()` to fill the trace and parent span.
2147    /// If that fails, which can happen due to interference with e.g.
2148    /// the stdout layer of the subscriber, use the `span` which is guaranteed
2149    /// to be on info level.
2150    #[must_use]
2151    #[expect(clippy::items_after_statements)]
2152    fn create(span: &Span, link_marker: bool) -> Self {
2153        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2154        let mut metadata = Self(Some(hashbrown::HashMap::default()));
2155        let mut metadata_view = ExecutionMetadataInjectorView {
2156            metadata: &mut metadata,
2157        };
2158        // inject the current context through the amqp headers
2159        fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2160            opentelemetry::global::get_text_map_propagator(|propagator| {
2161                propagator.inject_context(&s.context(), metadata_view);
2162            });
2163        }
2164        inject(&Span::current(), &mut metadata_view);
2165        if metadata_view.is_empty() {
2166            // The subscriber sent us a current span that is actually disabled
2167            inject(span, &mut metadata_view);
2168        }
2169        if link_marker {
2170            metadata_view.set(Self::LINKED_KEY, String::new());
2171        }
2172        metadata
2173    }
2174
2175    pub fn enrich(&self, span: &Span) {
2176        use opentelemetry::trace::TraceContextExt as _;
2177        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2178
2179        let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2180        let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2181            propagator.extract(&metadata_view)
2182        });
2183        if metadata_view.get(Self::LINKED_KEY).is_some() {
2184            let linked_span_context = otel_context.span().span_context().clone();
2185            span.add_link(linked_span_context);
2186        } else {
2187            let _ = span.set_parent(otel_context);
2188        }
2189    }
2190}
2191
2192struct ExecutionMetadataInjectorView<'a> {
2193    metadata: &'a mut ExecutionMetadata,
2194}
2195
2196impl ExecutionMetadataInjectorView<'_> {
2197    fn is_empty(&self) -> bool {
2198        self.metadata
2199            .0
2200            .as_ref()
2201            .is_some_and(hashbrown::HashMap::is_empty)
2202    }
2203}
2204
2205impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2206    fn set(&mut self, key: &str, value: String) {
2207        let key = format!("tracing:{key}");
2208        let map = if let Some(map) = self.metadata.0.as_mut() {
2209            map
2210        } else {
2211            self.metadata.0 = Some(hashbrown::HashMap::new());
2212            assert_matches!(&mut self.metadata.0, Some(map) => map)
2213        };
2214        map.insert(key, value);
2215    }
2216}
2217
2218struct ExecutionMetadataExtractorView<'a> {
2219    metadata: &'a ExecutionMetadata,
2220}
2221
2222impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2223    fn get(&self, key: &str) -> Option<&str> {
2224        self.metadata
2225            .0
2226            .as_ref()
2227            .and_then(|map| map.get(&format!("tracing:{key}")))
2228            .map(std::string::String::as_str)
2229    }
2230
2231    fn keys(&self) -> Vec<&str> {
2232        match &self.metadata.0.as_ref() {
2233            Some(map) => map
2234                .keys()
2235                .filter_map(|key| key.strip_prefix("tracing:"))
2236                .collect(),
2237            None => vec![],
2238        }
2239    }
2240}
2241
2242#[cfg(test)]
2243mod tests {
2244
2245    use rstest::rstest;
2246
2247    use crate::{
2248        ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2249    };
2250    use std::{
2251        hash::{DefaultHasher, Hash, Hasher},
2252        str::FromStr,
2253        sync::Arc,
2254    };
2255
2256    #[test]
2257    fn ulid_parsing() {
2258        let generated = ExecutorId::generate();
2259        let str = generated.to_string();
2260        let parsed = str.parse().unwrap();
2261        assert_eq!(generated, parsed);
2262    }
2263
2264    #[test]
2265    fn execution_id_parsing_top_level() {
2266        let generated = ExecutionId::generate();
2267        let str = generated.to_string();
2268        let parsed = str.parse().unwrap();
2269        assert_eq!(generated, parsed);
2270    }
2271
2272    #[test]
2273    fn execution_id_with_one_level_should_parse() {
2274        let top_level = ExecutionId::generate();
2275        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2276        let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2277        let ser = first_child.to_string();
2278        assert_eq!(format!("{top_level}.n:name_1"), ser);
2279        let parsed = ExecutionId::from_str(&ser).unwrap();
2280        assert_eq!(first_child, parsed);
2281    }
2282
2283    #[test]
2284    fn execution_id_increment_twice() {
2285        let top_level = ExecutionId::generate();
2286        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2287        let first_child = top_level.next_level(&join_set_id);
2288        let second_child = ExecutionId::Derived(first_child.get_incremented());
2289        let ser = second_child.to_string();
2290        assert_eq!(format!("{top_level}.n:name_2"), ser);
2291        let parsed = ExecutionId::from_str(&ser).unwrap();
2292        assert_eq!(second_child, parsed);
2293    }
2294
2295    #[test]
2296    fn execution_id_next_level_twice() {
2297        let top_level = ExecutionId::generate();
2298        let join_set_id_outer =
2299            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2300        let join_set_id_inner =
2301            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2302        let execution_id = ExecutionId::Derived(
2303            top_level
2304                .next_level(&join_set_id_outer)
2305                .get_incremented()
2306                .next_level(&join_set_id_inner)
2307                .get_incremented(),
2308        );
2309        let ser = execution_id.to_string();
2310        assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2311        let parsed = ExecutionId::from_str(&ser).unwrap();
2312        assert_eq!(execution_id, parsed);
2313    }
2314
2315    #[test]
2316    fn execution_id_split_first_level() {
2317        let top_level = ExecutionId::generate();
2318        let join_set_id =
2319            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2320        let execution_id = top_level.next_level(&join_set_id);
2321        let (actual_top_level, actual_join_set) = execution_id.split_to_parts();
2322        assert_eq!(top_level, actual_top_level);
2323        assert_eq!(join_set_id, actual_join_set);
2324    }
2325
2326    #[rstest]
2327    fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2328        let top_level = ExecutionId::generate();
2329        let join_set_id_outer =
2330            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2331        let first_level = top_level
2332            .next_level(&join_set_id_outer)
2333            .get_incremented_by(outer_idx);
2334
2335        let join_set_id_inner =
2336            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2337        let second_level = first_level.next_level(&join_set_id_inner);
2338
2339        let (actual_first_level, actual_join_set) = second_level.split_to_parts();
2340        assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2341        assert_eq!(join_set_id_inner, actual_join_set);
2342    }
2343
2344    #[test]
2345    fn invalid_execution_id_should_fail_to_parse() {
2346        ExecutionId::from_str("E_01KBNHM5FQW81KP5Y3XMNX31K4.g:gg._2.o:oo_2").unwrap_err();
2347    }
2348
2349    #[test]
2350    fn execution_id_hash_should_be_stable() {
2351        let parent = ExecutionId::from_parts(1, 2);
2352        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2353        let sibling_1 = parent.next_level(&join_set_id);
2354        let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2355        let sibling_1 = ExecutionId::Derived(sibling_1);
2356        let join_set_id_inner =
2357            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2358        let child =
2359            ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2360        let parent = parent.random_seed();
2361        let sibling_1 = sibling_1.random_seed();
2362        let sibling_2 = sibling_2.random_seed();
2363        let child = child.random_seed();
2364        let vec = vec![parent, sibling_1, sibling_2, child];
2365        insta::assert_debug_snapshot!(vec);
2366        // check that every hash is unique
2367        let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2368        assert_eq!(4, set.len());
2369    }
2370
2371    #[test]
2372    fn hash_of_str_variants_should_be_equal() {
2373        let input = "foo";
2374        let left = StrVariant::Arc(Arc::from(input));
2375        let right = StrVariant::Static(input);
2376        assert_eq!(left, right);
2377        let mut left_hasher = DefaultHasher::new();
2378        left.hash(&mut left_hasher);
2379        let mut right_hasher = DefaultHasher::new();
2380        right.hash(&mut right_hasher);
2381        let left_hasher = left_hasher.finish();
2382        let right_hasher = right_hasher.finish();
2383        println!("left: {left_hasher:x}, right: {right_hasher:x}");
2384        assert_eq!(left_hasher, right_hasher);
2385    }
2386
2387    #[test]
2388    fn ffqn_from_tuple_with_version_should_work() {
2389        let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2390        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2391    }
2392
2393    #[test]
2394    fn ffqn_from_str_with_version_should_work() {
2395        let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2396        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2397    }
2398
2399    #[tokio::test]
2400    async fn join_set_serde_should_be_consistent() {
2401        use crate::{JoinSetId, JoinSetKind};
2402        use strum::IntoEnumIterator;
2403        for kind in JoinSetKind::iter() {
2404            let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2405            let ser = serde_json::to_string(&join_set_id).unwrap();
2406            let deser = serde_json::from_str(&ser).unwrap();
2407            assert_eq!(join_set_id, deser);
2408        }
2409    }
2410}