obeli_sk_concepts/
lib.rs

1use ::serde::{Deserialize, Serialize};
2use arbitrary::Arbitrary;
3use assert_matches::assert_matches;
4use async_trait::async_trait;
5pub use indexmap;
6use indexmap::IndexMap;
7use opentelemetry::propagation::{Extractor, Injector};
8pub use prefixed_ulid::ExecutionId;
9use prefixed_ulid::{ExecutionIdDerived, ExecutionIdParseError};
10use serde_json::Value;
11use std::{
12    borrow::Borrow,
13    fmt::{Debug, Display},
14    hash::Hash,
15    marker::PhantomData,
16    ops::Deref,
17    str::FromStr,
18    sync::Arc,
19    time::Duration,
20};
21use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
22use tracing::Span;
23use val_json::{
24    type_wrapper::{TypeConversionError, TypeWrapper},
25    wast_val::{WastVal, WastValWithType},
26    wast_val_ser::params,
27};
28use wasmtime::component::{Type, Val};
29
30#[cfg(feature = "rusqlite")]
31mod rusqlite_ext;
32pub mod storage;
33pub mod time;
34
35pub const NAMESPACE_OBELISK: &str = "obelisk";
36pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
37
38pub type FinishedExecutionResult = Result<SupportedFunctionReturnValue, FinishedExecutionError>;
39
40#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
41pub enum FinishedExecutionError {
42    // Activity only, because workflows will be retried forever
43    #[error("permanent timeout")]
44    PermanentTimeout,
45    // Workflow only
46    #[error("unhandled child execution error {child_execution_id}")]
47    UnhandledChildExecutionError {
48        child_execution_id: ExecutionIdDerived,
49        root_cause_id: ExecutionIdDerived,
50    },
51    #[error("permanent failure: {reason_full}")]
52    PermanentFailure {
53        // Exists just for extracting reason of an activity trap, to avoid "activity trap: " prefix.
54        reason_inner: String,
55        // Contains reason_inner embedded in the error message
56        reason_full: String,
57        kind: PermanentFailureKind,
58        detail: Option<String>,
59    },
60}
61impl FinishedExecutionError {
62    #[must_use]
63    pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
64        match self {
65            FinishedExecutionError::PermanentTimeout => PendingStateFinishedError::Timeout,
66            FinishedExecutionError::UnhandledChildExecutionError { .. } => {
67                PendingStateFinishedError::UnhandledChildExecutionError
68            }
69            FinishedExecutionError::PermanentFailure { .. } => {
70                PendingStateFinishedError::ExecutionFailure
71            }
72        }
73    }
74}
75
76#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "snake_case")]
78pub enum PermanentFailureKind {
79    /// Applicable to Workflow
80    NondeterminismDetected,
81    /// Applicable to Workflow, Activity
82    ParamsParsingError,
83    /// Applicable to Workflow, Activity
84    CannotInstantiate,
85    /// Applicable to Workflow, Activity
86    ResultParsingError,
87    /// Applicable to Workflow
88    ImportedFunctionCallError,
89    /// Applicable to Activity
90    ActivityTrap,
91    /// Applicable to Workflow
92    WorkflowTrap,
93    /// Applicable to Workflow
94    JoinSetNameConflict,
95    /// Applicable to webhook endpoint
96    WebhookEndpointError,
97}
98
99#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum TrapKind {
102    #[display("trap")]
103    Trap,
104    #[display("post_return_trap")]
105    PostReturnTrap,
106}
107
108#[derive(Clone, Eq, derive_more::Display)]
109pub enum StrVariant {
110    Static(&'static str),
111    Arc(Arc<str>),
112}
113
114impl StrVariant {
115    #[must_use]
116    pub const fn empty() -> StrVariant {
117        StrVariant::Static("")
118    }
119}
120
121impl From<String> for StrVariant {
122    fn from(value: String) -> Self {
123        StrVariant::Arc(Arc::from(value))
124    }
125}
126
127impl From<&'static str> for StrVariant {
128    fn from(value: &'static str) -> Self {
129        StrVariant::Static(value)
130    }
131}
132
133impl PartialEq for StrVariant {
134    fn eq(&self, other: &Self) -> bool {
135        match (self, other) {
136            (Self::Static(left), Self::Static(right)) => left == right,
137            (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
138            (Self::Arc(left), Self::Arc(right)) => left == right,
139            (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
140        }
141    }
142}
143
144impl Hash for StrVariant {
145    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
146        match self {
147            StrVariant::Static(val) => val.hash(state),
148            StrVariant::Arc(val) => {
149                let str: &str = val.deref();
150                str.hash(state);
151            }
152        }
153    }
154}
155
156impl Debug for StrVariant {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        Display::fmt(self, f)
159    }
160}
161
162impl Deref for StrVariant {
163    type Target = str;
164    fn deref(&self) -> &Self::Target {
165        match self {
166            Self::Arc(v) => v,
167            Self::Static(v) => v,
168        }
169    }
170}
171
172impl AsRef<str> for StrVariant {
173    fn as_ref(&self) -> &str {
174        match self {
175            Self::Arc(v) => v,
176            Self::Static(v) => v,
177        }
178    }
179}
180
181mod serde_strvariant {
182    use crate::StrVariant;
183    use serde::{
184        Deserialize, Deserializer, Serialize, Serializer,
185        de::{self, Visitor},
186    };
187    use std::{ops::Deref, sync::Arc};
188
189    impl Serialize for StrVariant {
190        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
191        where
192            S: Serializer,
193        {
194            serializer.serialize_str(self.deref())
195        }
196    }
197
198    impl<'de> Deserialize<'de> for StrVariant {
199        fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
200        where
201            D: Deserializer<'de>,
202        {
203            deserializer.deserialize_str(StrVariantVisitor)
204        }
205    }
206
207    struct StrVariantVisitor;
208
209    impl Visitor<'_> for StrVariantVisitor {
210        type Value = StrVariant;
211
212        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213            formatter.write_str("a string")
214        }
215
216        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
217        where
218            E: de::Error,
219        {
220            Ok(StrVariant::Arc(Arc::from(v)))
221        }
222    }
223}
224
225#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
226#[display("{value}")]
227#[serde(transparent)]
228pub struct Name<T> {
229    value: StrVariant,
230    #[serde(skip)]
231    phantom_data: PhantomData<fn(T) -> T>,
232}
233
234impl<T> Name<T> {
235    #[must_use]
236    pub fn new_arc(value: Arc<str>) -> Self {
237        Self {
238            value: StrVariant::Arc(value),
239            phantom_data: PhantomData,
240        }
241    }
242
243    #[must_use]
244    pub const fn new_static(value: &'static str) -> Self {
245        Self {
246            value: StrVariant::Static(value),
247            phantom_data: PhantomData,
248        }
249    }
250}
251
252impl<T> Debug for Name<T> {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        Display::fmt(&self, f)
255    }
256}
257
258impl<T> Deref for Name<T> {
259    type Target = str;
260
261    fn deref(&self) -> &Self::Target {
262        self.value.deref()
263    }
264}
265
266impl<T> Borrow<str> for Name<T> {
267    fn borrow(&self) -> &str {
268        self.deref()
269    }
270}
271
272impl<T> From<String> for Name<T> {
273    fn from(value: String) -> Self {
274        Self::new_arc(Arc::from(value))
275    }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
279pub struct PkgFqn {
280    pub namespace: String,
281    pub package_name: String,
282    pub version: Option<String>,
283}
284impl Display for PkgFqn {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        let PkgFqn {
287            namespace,
288            package_name,
289            version,
290        } = self;
291        if let Some(version) = version {
292            write!(f, "{namespace}:{package_name}@{version}")
293        } else {
294            write!(f, "{namespace}:{package_name}")
295        }
296    }
297}
298
299impl PkgFqn {
300    #[must_use]
301    pub fn is_extension(&self) -> bool {
302        self.package_name.ends_with(SUFFIX_PKG_EXT)
303    }
304
305    #[must_use]
306    pub fn package_strip_extension_suffix(&self) -> Option<&str> {
307        self.package_name.as_str().strip_suffix(SUFFIX_PKG_EXT)
308    }
309
310    #[must_use]
311    pub fn is_namespace_obelisk(&self) -> bool {
312        self.namespace == NAMESPACE_OBELISK
313    }
314
315    #[must_use]
316    pub fn ifc_fqn_name(&self, ifc_name: &str) -> IfcFqnName {
317        IfcFqnName::from_parts(
318            &self.namespace,
319            &self.package_name,
320            ifc_name,
321            self.version.as_deref(),
322        )
323    }
324}
325
326#[derive(Hash, Clone, PartialEq, Eq)]
327pub struct IfcFqnMarker;
328
329pub type IfcFqnName = Name<IfcFqnMarker>; // namespace:name/ifc_name OR namespace:name/ifc_name@version
330
331impl IfcFqnName {
332    #[must_use]
333    pub fn namespace(&self) -> &str {
334        self.deref().split_once(':').unwrap().0
335    }
336
337    #[must_use]
338    pub fn package_name(&self) -> &str {
339        let after_colon = self.deref().split_once(':').unwrap().1;
340        after_colon.split_once('/').unwrap().0
341    }
342
343    #[must_use]
344    pub fn version(&self) -> Option<&str> {
345        self.deref().split_once('@').map(|(_, version)| version)
346    }
347
348    #[must_use]
349    pub fn pkg_fqn_name(&self) -> PkgFqn {
350        let (namespace, rest) = self.deref().split_once(':').unwrap();
351        let (package_name, rest) = rest.split_once('/').unwrap();
352        let version = rest.split_once('@').map(|(_, version)| version);
353        PkgFqn {
354            namespace: namespace.to_string(),
355            package_name: package_name.to_string(),
356            version: version.map(std::string::ToString::to_string),
357        }
358    }
359
360    #[must_use]
361    pub fn ifc_name(&self) -> &str {
362        let after_colon = self.deref().split_once(':').unwrap().1;
363        let after_slash = after_colon.split_once('/').unwrap().1;
364        after_slash
365            .split_once('@')
366            .map_or(after_slash, |(ifc, _)| ifc)
367    }
368
369    #[must_use]
370    pub fn from_parts(
371        namespace: &str,
372        package_name: &str,
373        ifc_name: &str,
374        version: Option<&str>,
375    ) -> Self {
376        let mut str = format!("{namespace}:{package_name}/{ifc_name}");
377        if let Some(version) = version {
378            str += "@";
379            str += version;
380        }
381        Self::new_arc(Arc::from(str))
382    }
383
384    #[must_use]
385    pub fn is_extension(&self) -> bool {
386        self.package_name().ends_with(SUFFIX_PKG_EXT)
387    }
388
389    #[must_use]
390    pub fn package_strip_extension_suffix(&self) -> Option<&str> {
391        self.package_name().strip_suffix(SUFFIX_PKG_EXT)
392    }
393
394    #[must_use]
395    pub fn is_namespace_obelisk(&self) -> bool {
396        self.namespace() == NAMESPACE_OBELISK
397    }
398}
399
400#[derive(Hash, Clone, PartialEq, Eq)]
401pub struct FnMarker;
402
403pub type FnName = Name<FnMarker>;
404
405#[derive(Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
406pub struct FunctionFqn {
407    pub ifc_fqn: IfcFqnName,
408    pub function_name: FnName,
409}
410
411impl FunctionFqn {
412    #[must_use]
413    pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
414        Self {
415            ifc_fqn: Name::new_arc(ifc_fqn),
416            function_name: Name::new_arc(function_name),
417        }
418    }
419
420    #[must_use]
421    pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
422        Self {
423            ifc_fqn: Name::new_static(ifc_fqn),
424            function_name: Name::new_static(function_name),
425        }
426    }
427
428    #[must_use]
429    pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
430        Self::new_static(tuple.0, tuple.1)
431    }
432
433    pub fn try_from_tuple(
434        ifc_fqn: &str,
435        function_name: &str,
436    ) -> Result<Self, FunctionFqnParseError> {
437        if function_name.contains('.') {
438            Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
439        } else {
440            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
441        }
442    }
443}
444
445#[derive(Debug, thiserror::Error)]
446pub enum FunctionFqnParseError {
447    #[error("delimiter `.` not found")]
448    DelimiterNotFound,
449    #[error("delimiter `.` found in function name")]
450    DelimiterFoundInFunctionName,
451}
452
453impl FromStr for FunctionFqn {
454    type Err = FunctionFqnParseError;
455
456    fn from_str(s: &str) -> Result<Self, Self::Err> {
457        if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
458            Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
459        } else {
460            Err(FunctionFqnParseError::DelimiterNotFound)
461        }
462    }
463}
464
465impl Display for FunctionFqn {
466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467        write!(
468            f,
469            "{ifc_fqn}.{function_name}",
470            ifc_fqn = self.ifc_fqn,
471            function_name = self.function_name
472        )
473    }
474}
475
476impl Debug for FunctionFqn {
477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478        Display::fmt(&self, f)
479    }
480}
481
482impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
483    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
484        let illegal = [':', '@', '.'];
485        let namespace = u.arbitrary::<String>()?.replace(illegal, "");
486        let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
487        let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
488        let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
489
490        Ok(FunctionFqn::new_arc(
491            Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
492            Arc::from(fn_name),
493        ))
494    }
495}
496
497#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
498pub enum SupportedFunctionReturnValue {
499    None,
500    // Top level type is result<_,_> with Err variant
501    FallibleResultErr(WastValWithType),
502    // All other top level types
503    InfallibleOrResultOk(WastValWithType),
504}
505
506#[derive(Debug, thiserror::Error)]
507pub enum ResultParsingError {
508    #[error("result cannot be parsed, multi-value results are not supported")]
509    MultiValue,
510    #[error("result cannot be parsed, {0}")]
511    TypeConversionError(#[from] val_json::type_wrapper::TypeConversionError),
512    #[error("result cannot be parsed, {0}")]
513    ValueConversionError(#[from] val_json::wast_val::WastValConversionError),
514}
515
516impl SupportedFunctionReturnValue {
517    pub fn new<
518        I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
519    >(
520        mut iter: I,
521    ) -> Result<Self, ResultParsingError> {
522        if iter.len() == 0 {
523            Ok(Self::None)
524        } else if iter.len() == 1 {
525            let (val, r#type) = iter.next().unwrap();
526            let r#type = TypeWrapper::try_from(r#type)?;
527            let val = WastVal::try_from(val)?;
528            match &val {
529                WastVal::Result(Err(_)) => Ok(Self::FallibleResultErr(WastValWithType {
530                    r#type,
531                    value: val,
532                })),
533                _ => Ok(Self::InfallibleOrResultOk(WastValWithType {
534                    r#type,
535                    value: val,
536                })),
537            }
538        } else {
539            Err(ResultParsingError::MultiValue)
540        }
541    }
542
543    #[cfg(feature = "test")]
544    #[must_use]
545    pub fn fallible_err(&self) -> Option<Option<&WastVal>> {
546        match self {
547            SupportedFunctionReturnValue::FallibleResultErr(WastValWithType {
548                value: WastVal::Result(Err(err)),
549                ..
550            }) => Some(err.as_deref()),
551            _ => None,
552        }
553    }
554
555    #[cfg(feature = "test")]
556    #[must_use]
557    pub fn fallible_ok(&self) -> Option<Option<&WastVal>> {
558        match self {
559            SupportedFunctionReturnValue::InfallibleOrResultOk(WastValWithType {
560                value: WastVal::Result(Ok(ok)),
561                ..
562            }) => Some(ok.as_deref()),
563            _ => None,
564        }
565    }
566
567    #[cfg(feature = "test")]
568    #[must_use]
569    pub fn val_type(&self) -> Option<&TypeWrapper> {
570        match self {
571            SupportedFunctionReturnValue::None => None,
572            SupportedFunctionReturnValue::FallibleResultErr(v)
573            | SupportedFunctionReturnValue::InfallibleOrResultOk(v) => Some(&v.r#type),
574        }
575    }
576
577    #[must_use]
578    pub fn value(&self) -> Option<&WastVal> {
579        match self {
580            SupportedFunctionReturnValue::None => None,
581            SupportedFunctionReturnValue::FallibleResultErr(v)
582            | SupportedFunctionReturnValue::InfallibleOrResultOk(v) => Some(&v.value),
583        }
584    }
585
586    #[must_use]
587    pub fn into_value(self) -> Option<WastVal> {
588        match self {
589            SupportedFunctionReturnValue::None => None,
590            SupportedFunctionReturnValue::FallibleResultErr(v)
591            | SupportedFunctionReturnValue::InfallibleOrResultOk(v) => Some(v.value),
592        }
593    }
594
595    #[must_use]
596    pub fn len(&self) -> usize {
597        match self {
598            SupportedFunctionReturnValue::None => 0,
599            _ => 1,
600        }
601    }
602
603    #[must_use]
604    pub fn is_empty(&self) -> bool {
605        matches!(self, Self::None)
606    }
607
608    #[must_use]
609    pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
610        if let SupportedFunctionReturnValue::FallibleResultErr(_) = self {
611            PendingStateFinishedResultKind(Err(PendingStateFinishedError::FallibleError))
612        } else {
613            PendingStateFinishedResultKind(Ok(()))
614        }
615    }
616}
617
618#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct Params(ParamsInternal);
620
621#[derive(Debug, Clone, PartialEq, Eq)]
622enum ParamsInternal {
623    JsonValues(Vec<Value>),
624    Vals {
625        //TODO: is Arc needed here? Or move upwards?
626        vals: Arc<[wasmtime::component::Val]>,
627    },
628    Empty,
629}
630
631impl Default for Params {
632    fn default() -> Self {
633        Self(ParamsInternal::Empty)
634    }
635}
636
637#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
638pub enum FunctionExtension {
639    Submit,
640    AwaitNext,
641    Schedule,
642}
643
644#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
645pub struct FunctionMetadata {
646    pub ffqn: FunctionFqn,
647    pub parameter_types: ParameterTypes,
648    pub return_type: Option<ReturnType>,
649    pub extension: Option<FunctionExtension>,
650    pub submittable: bool,
651}
652impl Display for FunctionMetadata {
653    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654        write!(
655            f,
656            "{ffqn}: func{params}",
657            ffqn = self.ffqn,
658            params = self.parameter_types
659        )?;
660        if let Some(return_type) = &self.return_type {
661            write!(f, " -> {return_type}")?;
662        }
663        Ok(())
664    }
665}
666
667pub mod serde_params {
668    use crate::{Params, ParamsInternal};
669    use serde::de::{SeqAccess, Visitor};
670    use serde::ser::SerializeSeq;
671    use serde::{Deserialize, Serialize};
672    use serde_json::Value;
673    use val_json::wast_val::WastVal;
674
675    impl Serialize for Params {
676        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
677        where
678            S: ::serde::Serializer,
679        {
680            match &self.0 {
681                ParamsInternal::Vals { vals } => {
682                    let mut seq = serializer.serialize_seq(Some(vals.len()))?; // size must be equal, checked when constructed.
683                    for val in vals.iter() {
684                        let value = WastVal::try_from(val.clone())
685                            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
686                        seq.serialize_element(&value)?;
687                    }
688                    seq.end()
689                }
690                ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
691                ParamsInternal::JsonValues(vec) => {
692                    let mut seq = serializer.serialize_seq(Some(vec.len()))?;
693                    for item in vec {
694                        seq.serialize_element(item)?;
695                    }
696                    seq.end()
697                }
698            }
699        }
700    }
701
702    pub struct VecVisitor;
703
704    impl<'de> Visitor<'de> for VecVisitor {
705        type Value = Vec<Value>;
706
707        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
708            formatter.write_str("a sequence of `Value`")
709        }
710
711        #[inline]
712        fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
713        where
714            V: SeqAccess<'de>,
715        {
716            let mut vec = Vec::new();
717            while let Some(elem) = visitor.next_element()? {
718                vec.push(elem);
719            }
720            Ok(vec)
721        }
722    }
723
724    impl<'de> Deserialize<'de> for Params {
725        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
726        where
727            D: serde::Deserializer<'de>,
728        {
729            let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
730            if vec.is_empty() {
731                Ok(Self(ParamsInternal::Empty))
732            } else {
733                Ok(Self(ParamsInternal::JsonValues(vec)))
734            }
735        }
736    }
737}
738
739#[derive(Debug, thiserror::Error)]
740pub enum ParamsParsingError {
741    #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
742    ParameterTypeError {
743        idx: usize,
744        err: TypeConversionError,
745    },
746    #[error("parameters cannot be deserialized: {0}")]
747    ParamsDeserializationError(serde_json::Error),
748    #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
749    ParameterCardinalityMismatch { expected: usize, specified: usize },
750}
751
752impl ParamsParsingError {
753    #[must_use]
754    pub fn detail(&self) -> Option<String> {
755        match self {
756            ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
757            ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
758            ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
759        }
760    }
761}
762
763#[derive(Debug, thiserror::Error)]
764pub enum ParamsFromJsonError {
765    #[error("value must be a json array containing function parameters")]
766    MustBeArray,
767}
768
769impl Params {
770    #[must_use]
771    pub const fn empty() -> Self {
772        Self(ParamsInternal::Empty)
773    }
774
775    #[must_use]
776    pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
777        if vals.is_empty() {
778            Self::empty()
779        } else {
780            Self(ParamsInternal::Vals { vals })
781        }
782    }
783
784    #[must_use]
785    pub fn from_json_values(vec: Vec<Value>) -> Self {
786        if vec.is_empty() {
787            Self::empty()
788        } else {
789            Self(ParamsInternal::JsonValues(vec))
790        }
791    }
792
793    pub fn typecheck<'a>(
794        &self,
795        param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
796    ) -> Result<(), ParamsParsingError> {
797        if param_types.len() != self.len() {
798            return Err(ParamsParsingError::ParameterCardinalityMismatch {
799                expected: param_types.len(),
800                specified: self.len(),
801            });
802        }
803        match &self.0 {
804            ParamsInternal::Vals { .. } /* already typechecked */ | ParamsInternal::Empty => {}
805            ParamsInternal::JsonValues(params) => {
806                params::deserialize_values(params, param_types)
807                .map_err(ParamsParsingError::ParamsDeserializationError)?;
808            }
809        }
810        Ok(())
811    }
812
813    pub fn as_vals(
814        &self,
815        param_types: Box<[(String, Type)]>,
816    ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
817        if param_types.len() != self.len() {
818            return Err(ParamsParsingError::ParameterCardinalityMismatch {
819                expected: param_types.len(),
820                specified: self.len(),
821            });
822        }
823        match &self.0 {
824            ParamsInternal::JsonValues(json_vec) => {
825                let param_types = param_types
826                    .into_vec()
827                    .into_iter()
828                    .enumerate()
829                    .map(|(idx, (_param_name, ty))| {
830                        TypeWrapper::try_from(ty).map_err(|err| (idx, err))
831                    })
832                    .collect::<Result<Vec<_>, _>>()
833                    .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
834                Ok(params::deserialize_values(json_vec, param_types.iter())
835                    .map_err(ParamsParsingError::ParamsDeserializationError)?
836                    .into_iter()
837                    .map(Val::from)
838                    .collect())
839            }
840            ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
841            ParamsInternal::Empty => Ok(Arc::from([])),
842        }
843    }
844
845    #[must_use]
846    pub fn len(&self) -> usize {
847        match &self.0 {
848            ParamsInternal::JsonValues(vec) => vec.len(),
849            ParamsInternal::Vals { vals, .. } => vals.len(),
850            ParamsInternal::Empty => 0,
851        }
852    }
853
854    #[must_use]
855    pub fn is_empty(&self) -> bool {
856        self.len() == 0
857    }
858}
859
860pub mod prefixed_ulid {
861    use arbitrary::Arbitrary;
862    use serde_with::{DeserializeFromStr, SerializeDisplay};
863    use std::{
864        fmt::{Debug, Display},
865        hash::Hasher,
866        marker::PhantomData,
867        num::ParseIntError,
868        str::FromStr,
869        sync::Arc,
870    };
871    use ulid::Ulid;
872
873    use crate::JoinSetId;
874
875    #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
876    #[derive_where::derive_where(Clone, Copy)]
877    #[display("{}_{ulid}", Self::prefix())]
878    pub struct PrefixedUlid<T: 'static> {
879        ulid: Ulid,
880        phantom_data: PhantomData<fn(T) -> T>,
881    }
882
883    impl<T> PrefixedUlid<T> {
884        const fn new(ulid: Ulid) -> Self {
885            Self {
886                ulid,
887                phantom_data: PhantomData,
888            }
889        }
890
891        fn prefix() -> &'static str {
892            std::any::type_name::<T>().rsplit("::").next().unwrap()
893        }
894    }
895
896    impl<T> PrefixedUlid<T> {
897        #[must_use]
898        pub fn generate() -> Self {
899            Self::new(Ulid::new())
900        }
901
902        #[must_use]
903        pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
904            Self::new(Ulid::from_parts(timestamp_ms, random))
905        }
906
907        #[must_use]
908        pub fn timestamp_part(&self) -> u64 {
909            self.ulid.timestamp_ms()
910        }
911
912        #[must_use]
913        #[expect(clippy::cast_possible_truncation)]
914        pub fn random_part(&self) -> u64 {
915            self.ulid.random() as u64
916        }
917    }
918
919    #[derive(Debug, thiserror::Error)]
920    pub enum PrefixedUlidParseError {
921        #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
922        WrongPrefix { input: String, expected: String },
923        #[error("cannot parse ULID suffix from `{input}`")]
924        CannotParseUlid { input: String },
925    }
926
927    mod impls {
928        use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
929        use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
930
931        impl<T> FromStr for PrefixedUlid<T> {
932            type Err = PrefixedUlidParseError;
933
934            fn from_str(input: &str) -> Result<Self, Self::Err> {
935                let prefix = Self::prefix();
936                let mut input_chars = input.chars();
937                for exp in prefix.chars() {
938                    if input_chars.next() != Some(exp) {
939                        return Err(PrefixedUlidParseError::WrongPrefix {
940                            input: input.to_string(),
941                            expected: format!("{prefix}_"),
942                        });
943                    }
944                }
945                if input_chars.next() != Some('_') {
946                    return Err(PrefixedUlidParseError::WrongPrefix {
947                        input: input.to_string(),
948                        expected: format!("{prefix}_"),
949                    });
950                }
951                let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
952                    return Err(PrefixedUlidParseError::CannotParseUlid {
953                        input: input.to_string(),
954                    });
955                };
956                Ok(Self {
957                    ulid,
958                    phantom_data: PhantomData,
959                })
960            }
961        }
962
963        impl<T> Debug for PrefixedUlid<T> {
964            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
965                Display::fmt(&self, f)
966            }
967        }
968
969        impl<T> Hash for PrefixedUlid<T> {
970            fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
971                Self::prefix().hash(state);
972                self.ulid.hash(state);
973                self.phantom_data.hash(state);
974            }
975        }
976
977        impl<T> PartialEq for PrefixedUlid<T> {
978            fn eq(&self, other: &Self) -> bool {
979                self.ulid == other.ulid
980            }
981        }
982
983        impl<T> Eq for PrefixedUlid<T> {}
984
985        impl<T> PartialOrd for PrefixedUlid<T> {
986            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
987                Some(self.cmp(other))
988            }
989        }
990
991        impl<T> Ord for PrefixedUlid<T> {
992            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
993                self.ulid.cmp(&other.ulid)
994            }
995        }
996    }
997
998    pub mod prefix {
999        pub struct E;
1000        pub struct Exr;
1001        pub struct Run;
1002        pub struct Delay;
1003    }
1004
1005    pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1006    pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1007    pub type RunId = PrefixedUlid<prefix::Run>;
1008    pub type DelayId = PrefixedUlid<prefix::Delay>;
1009
1010    impl<'a, T> Arbitrary<'a> for PrefixedUlid<T> {
1011        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1012            Ok(Self::new(ulid::Ulid::from_parts(
1013                u.arbitrary()?,
1014                u.arbitrary()?,
1015            )))
1016        }
1017    }
1018
1019    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1020    pub enum ExecutionId {
1021        TopLevel(ExecutionIdTopLevel),
1022        Derived(ExecutionIdDerived),
1023    }
1024
1025    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1026    pub struct ExecutionIdDerived {
1027        top_level: ExecutionIdTopLevel,
1028        infix: Arc<str>,
1029        idx: u64,
1030    }
1031    impl ExecutionIdDerived {
1032        #[must_use]
1033        pub fn get_incremented(&self) -> Self {
1034            self.get_incremented_by(1)
1035        }
1036        #[must_use]
1037        pub fn get_incremented_by(&self, count: u64) -> Self {
1038            ExecutionIdDerived {
1039                top_level: self.top_level,
1040                infix: self.infix.clone(),
1041                idx: self.idx + count,
1042            }
1043        }
1044        #[must_use]
1045        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1046            let ExecutionIdDerived {
1047                top_level,
1048                infix,
1049                idx,
1050            } = self;
1051            let infix = Arc::from(format!(
1052                "{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1053            ));
1054            ExecutionIdDerived {
1055                top_level: *top_level,
1056                infix,
1057                idx: EXECUTION_ID_START_IDX,
1058            }
1059        }
1060        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1061            let ExecutionIdDerived {
1062                top_level,
1063                infix,
1064                idx,
1065            } = self;
1066            write!(
1067                f,
1068                "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}"
1069            )
1070        }
1071    }
1072    impl Debug for ExecutionIdDerived {
1073        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1074            self.display_or_debug(f)
1075        }
1076    }
1077    impl Display for ExecutionIdDerived {
1078        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1079            self.display_or_debug(f)
1080        }
1081    }
1082    impl FromStr for ExecutionIdDerived {
1083        type Err = ExecutionIdDerivedParseError;
1084
1085        fn from_str(input: &str) -> Result<Self, Self::Err> {
1086            if let Some((prefix, suffix)) = input.split_once(EXECUTION_ID_INFIX) {
1087                let top_level = PrefixedUlid::from_str(prefix)
1088                    .map_err(ExecutionIdDerivedParseError::PrefixedUlidParseError)?;
1089                let Some((infix, idx)) = suffix.rsplit_once(EXECUTION_ID_JOIN_SET_INFIX) else {
1090                    return Err(ExecutionIdDerivedParseError::SecondDelimiterNotFound);
1091                };
1092                let infix = Arc::from(infix);
1093                let idx =
1094                    u64::from_str(idx).map_err(ExecutionIdDerivedParseError::ParseIndexError)?;
1095                Ok(ExecutionIdDerived {
1096                    top_level,
1097                    infix,
1098                    idx,
1099                })
1100            } else {
1101                Err(ExecutionIdDerivedParseError::FirstDelimiterNotFound)
1102            }
1103        }
1104    }
1105    impl<'a> Arbitrary<'a> for ExecutionIdDerived {
1106        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1107            let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1108            let join_set_id = JoinSetId::arbitrary(u)?;
1109            Ok(top_level.next_level(&join_set_id))
1110        }
1111    }
1112    #[derive(Debug, thiserror::Error)]
1113    pub enum ExecutionIdDerivedParseError {
1114        #[error(transparent)]
1115        PrefixedUlidParseError(PrefixedUlidParseError),
1116        #[error("cannot parse derived execution id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1117        FirstDelimiterNotFound,
1118        #[error(
1119            "cannot parse derived execution id - delimiter `{EXECUTION_ID_JOIN_SET_INFIX}` not found"
1120        )]
1121        SecondDelimiterNotFound,
1122        #[error(
1123            "cannot parse derived execution id - suffix after `{EXECUTION_ID_JOIN_SET_INFIX}` must be a number"
1124        )]
1125        ParseIndexError(ParseIntError),
1126    }
1127
1128    impl ExecutionId {
1129        #[must_use]
1130        pub fn generate() -> Self {
1131            ExecutionId::TopLevel(PrefixedUlid::generate())
1132        }
1133
1134        #[must_use]
1135        pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1136            match &self {
1137                ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1138                ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1139            }
1140        }
1141
1142        #[must_use]
1143        pub fn is_top_level(&self) -> bool {
1144            matches!(self, ExecutionId::TopLevel(_))
1145        }
1146
1147        #[must_use]
1148        pub fn timestamp_part(&self) -> u64 {
1149            self.get_top_level().timestamp_part()
1150        }
1151
1152        #[must_use]
1153        pub fn random_seed(&self) -> u64 {
1154            let mut hasher = fxhash::FxHasher::default();
1155            hasher.write_u64(self.get_top_level().random_part());
1156            hasher.write_u64(self.timestamp_part());
1157            if let ExecutionId::Derived(ExecutionIdDerived {
1158                top_level: _,
1159                infix,
1160                idx,
1161            }) = self
1162            {
1163                hasher.write(infix.as_bytes());
1164                hasher.write_u64(*idx);
1165            }
1166            hasher.finish()
1167        }
1168
1169        #[must_use]
1170        pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1171            ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1172        }
1173
1174        #[must_use]
1175        pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1176            match &self {
1177                ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1178                    top_level: *top_level,
1179                    infix: Arc::from(join_set_id.to_string()),
1180                    idx: EXECUTION_ID_START_IDX,
1181                },
1182                ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1183            }
1184        }
1185
1186        fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1187            match &self {
1188                ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1189                ExecutionId::Derived(derived) => Display::fmt(derived, f),
1190            }
1191        }
1192    }
1193
1194    const EXECUTION_ID_INFIX: char = '.';
1195    const EXECUTION_ID_JOIN_SET_INFIX: char = '_';
1196    const EXECUTION_ID_START_IDX: u64 = 1;
1197    pub const JOIN_SET_START_IDX: u64 = 1;
1198
1199    #[derive(Debug, thiserror::Error)]
1200    pub enum ExecutionIdParseError {
1201        #[error(transparent)]
1202        PrefixedUlidParseError(#[from] PrefixedUlidParseError),
1203        #[error(
1204            "cannot parse derived execution id - first delimiter `{EXECUTION_ID_INFIX}` not found"
1205        )]
1206        FirstDelimiterNotFound,
1207        #[error(
1208            "cannot parse derived execution id - second delimiter `{EXECUTION_ID_INFIX}` not found"
1209        )]
1210        SecondDelimiterNotFound,
1211        #[error("cannot parse derived execution id - last suffix must be a number")]
1212        ParseIndexError(#[from] ParseIntError),
1213    }
1214
1215    impl FromStr for ExecutionId {
1216        type Err = ExecutionIdParseError;
1217
1218        fn from_str(input: &str) -> Result<Self, Self::Err> {
1219            if input.contains(EXECUTION_ID_INFIX) {
1220                ExecutionIdDerived::from_str(input)
1221                    .map(ExecutionId::Derived)
1222                    .map_err(|err| match err {
1223                        ExecutionIdDerivedParseError::FirstDelimiterNotFound => {
1224                            unreachable!("first delimiter checked")
1225                        }
1226                        ExecutionIdDerivedParseError::SecondDelimiterNotFound => {
1227                            ExecutionIdParseError::SecondDelimiterNotFound
1228                        }
1229                        ExecutionIdDerivedParseError::PrefixedUlidParseError(err) => {
1230                            ExecutionIdParseError::PrefixedUlidParseError(err)
1231                        }
1232                        ExecutionIdDerivedParseError::ParseIndexError(err) => {
1233                            ExecutionIdParseError::ParseIndexError(err)
1234                        }
1235                    })
1236            } else {
1237                Ok(ExecutionId::TopLevel(PrefixedUlid::from_str(input)?))
1238            }
1239        }
1240    }
1241
1242    impl Debug for ExecutionId {
1243        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1244            self.display_or_debug(f)
1245        }
1246    }
1247
1248    impl Display for ExecutionId {
1249        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1250            self.display_or_debug(f)
1251        }
1252    }
1253
1254    impl<'a> Arbitrary<'a> for ExecutionId {
1255        fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1256            Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1257        }
1258    }
1259}
1260
1261#[derive(
1262    Debug,
1263    Clone,
1264    PartialEq,
1265    Eq,
1266    Hash,
1267    derive_more::Display,
1268    serde_with::SerializeDisplay,
1269    serde_with::DeserializeFromStr,
1270)]
1271#[non_exhaustive] // force using the constructor as much as possible due to validation
1272#[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1273pub struct JoinSetId {
1274    pub kind: JoinSetKind,
1275    pub name: StrVariant,
1276}
1277
1278#[derive(
1279    Debug,
1280    Clone,
1281    Copy,
1282    PartialEq,
1283    Eq,
1284    derive_more::Display,
1285    arbitrary::Arbitrary,
1286    Serialize,
1287    Deserialize,
1288)]
1289#[serde(rename_all = "snake_case")]
1290pub enum ClosingStrategy {
1291    Complete,
1292}
1293
1294impl JoinSetId {
1295    pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1296        Ok(Self {
1297            kind,
1298            name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1299        })
1300    }
1301}
1302const CHARSET_JOIN_SET_NAME: &str =
1303    const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1304
1305pub const CHARSET_ALPHANUMERIC: &str =
1306    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1307
1308#[derive(
1309    Debug,
1310    Clone,
1311    Copy,
1312    PartialEq,
1313    Eq,
1314    Hash,
1315    derive_more::Display,
1316    Serialize,
1317    Deserialize,
1318    strum::EnumIter,
1319    Arbitrary,
1320)]
1321#[display("{}", self.as_code())]
1322pub enum JoinSetKind {
1323    OneOff,
1324    Named,
1325    Generated,
1326}
1327impl JoinSetKind {
1328    fn as_code(&self) -> &'static str {
1329        match self {
1330            JoinSetKind::OneOff => "o",
1331            JoinSetKind::Named => "n",
1332            JoinSetKind::Generated => "g",
1333        }
1334    }
1335}
1336impl FromStr for JoinSetKind {
1337    type Err = &'static str;
1338    fn from_str(s: &str) -> Result<Self, Self::Err> {
1339        use strum::IntoEnumIterator;
1340        Self::iter()
1341            .find(|variant| s == variant.as_code())
1342            .ok_or("unknown join set kind")
1343    }
1344}
1345
1346pub const JOIN_SET_ID_INFIX: char = ':';
1347const CHARSET_EXTRA_JSON_SET: &str = "_-/";
1348
1349impl FromStr for JoinSetId {
1350    type Err = JoinSetIdParseError;
1351
1352    fn from_str(input: &str) -> Result<Self, Self::Err> {
1353        let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1354            return Err(JoinSetIdParseError::WrongParts);
1355        };
1356        let kind = kind
1357            .parse()
1358            .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1359        Ok(JoinSetId::new(kind, StrVariant::from(name.to_string()))?)
1360    }
1361}
1362
1363#[derive(Debug, thiserror::Error)]
1364pub enum JoinSetIdParseError {
1365    #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1366    WrongParts,
1367    #[error("cannot parse join set id's execution id - {0}")]
1368    ExecutionIdParseError(#[from] ExecutionIdParseError),
1369    #[error("cannot parse join set kind - {0}")]
1370    JoinSetKindParseError(&'static str),
1371    #[error("cannot parse join set id - {0}")]
1372    InvalidName(#[from] InvalidNameError<JoinSetId>),
1373}
1374
1375impl<'a> Arbitrary<'a> for JoinSetId {
1376    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1377        let name: String = {
1378            let length_inclusive = u.int_in_range(0..=10).unwrap();
1379            (0..=length_inclusive)
1380                .map(|_| {
1381                    let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1382                    CHARSET_JOIN_SET_NAME
1383                        .chars()
1384                        .nth(idx)
1385                        .expect("idx is < charset.len()")
1386                })
1387                .collect()
1388        };
1389
1390        Ok(JoinSetId::new(JoinSetKind::Generated, StrVariant::from(name)).unwrap())
1391    }
1392}
1393
1394#[derive(
1395    Debug,
1396    Clone,
1397    Copy,
1398    strum::Display,
1399    PartialEq,
1400    Eq,
1401    strum::EnumString,
1402    Hash,
1403    serde_with::SerializeDisplay,
1404    serde_with::DeserializeFromStr,
1405)]
1406#[strum(serialize_all = "snake_case")]
1407pub enum ComponentType {
1408    ActivityWasm,
1409    Workflow,
1410    WebhookEndpoint,
1411}
1412
1413#[derive(
1414    derive_more::Debug,
1415    Clone,
1416    PartialEq,
1417    Eq,
1418    Hash,
1419    serde_with::SerializeDisplay,
1420    serde_with::DeserializeFromStr,
1421    derive_more::Display,
1422)]
1423#[display("{component_type}:{name}")]
1424#[debug("{}", self)]
1425#[non_exhaustive] // force using the constructor as much as possible due to validation
1426pub struct ComponentId {
1427    pub component_type: ComponentType,
1428    pub name: StrVariant,
1429}
1430impl ComponentId {
1431    pub fn new(
1432        component_type: ComponentType,
1433        name: StrVariant,
1434    ) -> Result<Self, InvalidNameError<Self>> {
1435        Ok(Self {
1436            component_type,
1437            name: check_name(name, "_")?,
1438        })
1439    }
1440
1441    #[must_use]
1442    pub const fn dummy_activity() -> Self {
1443        Self {
1444            component_type: ComponentType::ActivityWasm,
1445            name: StrVariant::empty(),
1446        }
1447    }
1448
1449    #[must_use]
1450    pub const fn dummy_workflow() -> ComponentId {
1451        ComponentId {
1452            component_type: ComponentType::Workflow,
1453            name: StrVariant::empty(),
1454        }
1455    }
1456}
1457
1458pub fn check_name<T>(
1459    name: StrVariant,
1460    special: &'static str,
1461) -> Result<StrVariant, InvalidNameError<T>> {
1462    if let Some(invalid) = name
1463        .as_ref()
1464        .chars()
1465        .find(|c| !c.is_ascii_alphanumeric() && !special.contains(*c))
1466    {
1467        Err(InvalidNameError::<T> {
1468            invalid,
1469            name: name.as_ref().to_string(),
1470            special,
1471            phantom_data: PhantomData,
1472        })
1473    } else {
1474        Ok(name)
1475    }
1476}
1477#[derive(Debug, thiserror::Error)]
1478#[error(
1479    "name of {} `{name}` contains invalid character `{invalid}`, must only contain alphanumeric characters and following characters {special}",
1480    std::any::type_name::<T>().rsplit("::").next().unwrap()
1481)]
1482pub struct InvalidNameError<T> {
1483    invalid: char,
1484    name: String,
1485    special: &'static str,
1486    phantom_data: PhantomData<T>,
1487}
1488
1489#[derive(Debug, thiserror::Error)]
1490pub enum ConfigIdParseError {
1491    #[error("cannot parse ComponentConfigHash - delimiter ':' not found")]
1492    DelimiterNotFound,
1493    #[error("cannot parse prefix of ComponentConfigHash - {0}")]
1494    ComponentTypeParseError(#[from] strum::ParseError),
1495}
1496
1497impl FromStr for ComponentId {
1498    type Err = ConfigIdParseError;
1499
1500    fn from_str(input: &str) -> Result<Self, Self::Err> {
1501        let (component_type, name) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1502        let component_type = component_type.parse()?;
1503        Ok(Self {
1504            component_type,
1505            name: StrVariant::from(name.to_string()),
1506        })
1507    }
1508}
1509
1510#[derive(
1511    Debug,
1512    Clone,
1513    Copy,
1514    strum::Display,
1515    strum::EnumString,
1516    PartialEq,
1517    Eq,
1518    Hash,
1519    serde_with::SerializeDisplay,
1520    serde_with::DeserializeFromStr,
1521)]
1522#[strum(serialize_all = "snake_case")]
1523pub enum HashType {
1524    Sha256,
1525}
1526
1527#[derive(
1528    Debug,
1529    Clone,
1530    derive_more::Display,
1531    derive_more::FromStr,
1532    derive_more::Deref,
1533    PartialEq,
1534    Eq,
1535    Hash,
1536    serde_with::SerializeDisplay,
1537    serde_with::DeserializeFromStr,
1538)]
1539pub struct ContentDigest(pub Digest);
1540pub const CONTENT_DIGEST_DUMMY: ContentDigest = ContentDigest(Digest {
1541    hash_type: HashType::Sha256,
1542    hash_base16: StrVariant::empty(),
1543});
1544
1545impl ContentDigest {
1546    #[must_use]
1547    pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1548        Self(Digest::new(hash_type, hash_base16))
1549    }
1550}
1551
1552#[derive(
1553    Debug,
1554    Clone,
1555    derive_more::Display,
1556    PartialEq,
1557    Eq,
1558    Hash,
1559    serde_with::SerializeDisplay,
1560    serde_with::DeserializeFromStr,
1561)]
1562#[display("{hash_type}:{hash_base16}")]
1563pub struct Digest {
1564    hash_type: HashType,
1565    hash_base16: StrVariant,
1566}
1567impl Digest {
1568    #[must_use]
1569    pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1570        Self {
1571            hash_type,
1572            hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1573        }
1574    }
1575
1576    #[must_use]
1577    pub fn hash_type(&self) -> HashType {
1578        self.hash_type
1579    }
1580
1581    #[must_use]
1582    pub fn digest_base16(&self) -> &str {
1583        &self.hash_base16
1584    }
1585}
1586
1587#[derive(Debug, thiserror::Error)]
1588pub enum DigestParseErrror {
1589    #[error("cannot parse ContentDigest - delimiter ':' not found")]
1590    DelimiterNotFound,
1591    #[error("cannot parse ContentDigest - invalid prefix `{hash_type}`")]
1592    TypeParseError { hash_type: String },
1593    #[error("cannot parse ContentDigest - invalid suffix length, expected 64 hex digits, got {0}")]
1594    SuffixLength(usize),
1595    #[error("cannot parse ContentDigest - suffix must be hex-encoded, got invalid character `{0}`")]
1596    SuffixInvalid(char),
1597}
1598
1599impl FromStr for Digest {
1600    type Err = DigestParseErrror;
1601
1602    fn from_str(input: &str) -> Result<Self, Self::Err> {
1603        let (hash_type, hash_base16) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1604        let hash_type =
1605            HashType::from_str(hash_type).map_err(|_err| Self::Err::TypeParseError {
1606                hash_type: hash_type.to_string(),
1607            })?;
1608        if hash_base16.len() != 64 {
1609            return Err(Self::Err::SuffixLength(hash_base16.len()));
1610        }
1611        if let Some(invalid) = hash_base16.chars().find(|c| !c.is_ascii_hexdigit()) {
1612            return Err(Self::Err::SuffixInvalid(invalid));
1613        }
1614        Ok(Self {
1615            hash_type,
1616            hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1617        })
1618    }
1619}
1620
1621#[derive(
1622    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1623)]
1624#[display("{wit_type}")]
1625pub struct ReturnType {
1626    pub type_wrapper: TypeWrapper,
1627    pub wit_type: StrVariant,
1628}
1629
1630#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
1631#[derive_where::derive_where(PartialEq)]
1632#[display("{name}: {wit_type}")]
1633pub struct ParameterType {
1634    pub type_wrapper: TypeWrapper,
1635    #[derive_where(skip)]
1636    // Names are read from how a component names the parameter and thus might differ between export and import.
1637    pub name: StrVariant,
1638    pub wit_type: StrVariant,
1639}
1640
1641#[derive(
1642    Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
1643)]
1644pub struct ParameterTypes(pub Vec<ParameterType>);
1645
1646impl Debug for ParameterTypes {
1647    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1648        write!(f, "(")?;
1649        let mut iter = self.0.iter().peekable();
1650        while let Some(p) = iter.next() {
1651            write!(f, "{p:?}")?;
1652            if iter.peek().is_some() {
1653                write!(f, ", ")?;
1654            }
1655        }
1656        write!(f, ")")
1657    }
1658}
1659
1660impl Display for ParameterTypes {
1661    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1662        write!(f, "(")?;
1663        let mut iter = self.0.iter().peekable();
1664        while let Some(p) = iter.next() {
1665            write!(f, "{p}")?;
1666            if iter.peek().is_some() {
1667                write!(f, ", ")?;
1668            }
1669        }
1670        write!(f, ")")
1671    }
1672}
1673
1674#[derive(Debug, Clone)]
1675pub struct PackageIfcFns {
1676    pub ifc_fqn: IfcFqnName,
1677    pub extension: bool,
1678    pub fns: IndexMap<FnName, FunctionMetadata>,
1679}
1680
1681#[derive(Debug, Clone, Copy)]
1682pub struct ComponentRetryConfig {
1683    pub max_retries: u32,
1684    pub retry_exp_backoff: Duration,
1685}
1686
1687/// Implementation must not return `-obelisk-ext` suffix in any package name, nor `obelisk` namespace.
1688#[async_trait]
1689pub trait FunctionRegistry: Send + Sync {
1690    async fn get_by_exported_function(
1691        &self,
1692        ffqn: &FunctionFqn,
1693    ) -> Option<(FunctionMetadata, ComponentId, ComponentRetryConfig)>;
1694
1695    fn all_exports(&self) -> &[PackageIfcFns];
1696}
1697
1698#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
1699#[display("{_0:?}")]
1700pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
1701
1702impl ExecutionMetadata {
1703    const LINKED_KEY: &str = "obelisk-tracing-linked";
1704    #[must_use]
1705    pub const fn empty() -> Self {
1706        // Remove `Optional` when const hashmap creation is allowed - https://github.com/rust-lang/rust/issues/123197
1707        Self(None)
1708    }
1709
1710    #[must_use]
1711    pub fn from_parent_span(less_specific: &Span) -> Self {
1712        ExecutionMetadata::create(less_specific, false)
1713    }
1714
1715    #[must_use]
1716    pub fn from_linked_span(less_specific: &Span) -> Self {
1717        ExecutionMetadata::create(less_specific, true)
1718    }
1719
1720    /// Attempt to use `Span::current()` to fill the trace and parent span.
1721    /// If that fails, which can happen due to interference with e.g.
1722    /// the stdout layer of the subscriber, use the `span` which is guaranteed
1723    /// to be on info level.
1724    #[must_use]
1725    #[expect(clippy::items_after_statements)]
1726    fn create(span: &Span, link_marker: bool) -> Self {
1727        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1728        let mut metadata = Self(Some(hashbrown::HashMap::default()));
1729        let mut metadata_view = ExecutionMetadataInjectorView {
1730            metadata: &mut metadata,
1731        };
1732        // inject the current context through the amqp headers
1733        fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
1734            opentelemetry::global::get_text_map_propagator(|propagator| {
1735                propagator.inject_context(&s.context(), metadata_view);
1736            });
1737        }
1738        inject(&Span::current(), &mut metadata_view);
1739        if metadata_view.is_empty() {
1740            // The subscriber sent us a current span that is actually disabled
1741            inject(span, &mut metadata_view);
1742        }
1743        if link_marker {
1744            metadata_view.set(Self::LINKED_KEY, String::new());
1745        }
1746        metadata
1747    }
1748
1749    pub fn enrich(&self, span: &Span) {
1750        use opentelemetry::trace::TraceContextExt as _;
1751        use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1752
1753        let metadata_view = ExecutionMetadataExtractorView { metadata: self };
1754        let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
1755            propagator.extract(&metadata_view)
1756        });
1757        if metadata_view.get(Self::LINKED_KEY).is_some() {
1758            let linked_span_context = otel_context.span().span_context().clone();
1759            span.add_link(linked_span_context);
1760        } else {
1761            span.set_parent(otel_context);
1762        }
1763    }
1764}
1765
1766struct ExecutionMetadataInjectorView<'a> {
1767    metadata: &'a mut ExecutionMetadata,
1768}
1769
1770impl ExecutionMetadataInjectorView<'_> {
1771    fn is_empty(&self) -> bool {
1772        self.metadata
1773            .0
1774            .as_ref()
1775            .is_some_and(hashbrown::HashMap::is_empty)
1776    }
1777}
1778
1779impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
1780    fn set(&mut self, key: &str, value: String) {
1781        let key = format!("tracing:{key}");
1782        let map = if let Some(map) = self.metadata.0.as_mut() {
1783            map
1784        } else {
1785            self.metadata.0 = Some(hashbrown::HashMap::new());
1786            assert_matches!(&mut self.metadata.0, Some(map) => map)
1787        };
1788        map.insert(key, value);
1789    }
1790}
1791
1792struct ExecutionMetadataExtractorView<'a> {
1793    metadata: &'a ExecutionMetadata,
1794}
1795
1796impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
1797    fn get(&self, key: &str) -> Option<&str> {
1798        self.metadata
1799            .0
1800            .as_ref()
1801            .and_then(|map| map.get(&format!("tracing:{key}")))
1802            .map(std::string::String::as_str)
1803    }
1804
1805    fn keys(&self) -> Vec<&str> {
1806        match &self.metadata.0.as_ref() {
1807            Some(map) => map
1808                .keys()
1809                .filter_map(|key| key.strip_prefix("tracing:"))
1810                .collect(),
1811            None => vec![],
1812        }
1813    }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818
1819    use crate::{
1820        ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
1821    };
1822    use std::{
1823        hash::{DefaultHasher, Hash, Hasher},
1824        str::FromStr,
1825        sync::Arc,
1826    };
1827
1828    #[cfg(madsim)]
1829    #[test]
1830    fn ulid_generation_should_be_deterministic() {
1831        let mut builder_a = madsim::runtime::Builder::from_env();
1832        builder_a.check = true;
1833
1834        let mut builder_b = madsim::runtime::Builder::from_env(); // Builder: Clone would be useful
1835        builder_b.check = true;
1836        builder_b.seed = builder_a.seed;
1837
1838        assert_eq!(
1839            builder_a.run(|| async { ulid::Ulid::new() }),
1840            builder_b.run(|| async { ulid::Ulid::new() })
1841        );
1842    }
1843
1844    #[test]
1845    fn ulid_parsing() {
1846        let generated = ExecutorId::generate();
1847        let str = generated.to_string();
1848        let parsed = str.parse().unwrap();
1849        assert_eq!(generated, parsed);
1850    }
1851
1852    #[test]
1853    fn execution_id_parsing_top_level() {
1854        let generated = ExecutionId::generate();
1855        let str = generated.to_string();
1856        let parsed = str.parse().unwrap();
1857        assert_eq!(generated, parsed);
1858    }
1859
1860    #[test]
1861    fn execution_id_with_one_level_should_parse() {
1862        let top_level = ExecutionId::generate();
1863        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
1864        let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
1865        let ser = first_child.to_string();
1866        assert_eq!(format!("{top_level}.n:name_1"), ser);
1867        let parsed = ExecutionId::from_str(&ser).unwrap();
1868        assert_eq!(first_child, parsed);
1869    }
1870
1871    #[test]
1872    fn execution_id_increment_twice() {
1873        let top_level = ExecutionId::generate();
1874        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
1875        let first_child = top_level.next_level(&join_set_id);
1876        let second_child = ExecutionId::Derived(first_child.get_incremented());
1877        let ser = second_child.to_string();
1878        assert_eq!(format!("{top_level}.n:name_2"), ser);
1879        let parsed = ExecutionId::from_str(&ser).unwrap();
1880        assert_eq!(second_child, parsed);
1881    }
1882
1883    #[test]
1884    fn execution_id_next_level_twice() {
1885        let top_level = ExecutionId::generate();
1886        let join_set_id_outer =
1887            JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
1888        let join_set_id_inner =
1889            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
1890        let execution_id = ExecutionId::Derived(
1891            top_level
1892                .next_level(&join_set_id_outer)
1893                .get_incremented()
1894                .next_level(&join_set_id_inner)
1895                .get_incremented(),
1896        );
1897        let ser = execution_id.to_string();
1898        assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
1899        let parsed = ExecutionId::from_str(&ser).unwrap();
1900        assert_eq!(execution_id, parsed);
1901    }
1902
1903    #[test]
1904    fn execution_id_hash_should_be_stable() {
1905        let parent = ExecutionId::from_parts(1, 2);
1906        let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
1907        let sibling_1 = parent.next_level(&join_set_id);
1908        let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
1909        let sibling_1 = ExecutionId::Derived(sibling_1);
1910        let join_set_id_inner =
1911            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
1912        let child =
1913            ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
1914        let parent = parent.random_seed();
1915        let sibling_1 = sibling_1.random_seed();
1916        let sibling_2 = sibling_2.random_seed();
1917        let child = child.random_seed();
1918        let vec = vec![parent, sibling_1, sibling_2, child];
1919        insta::assert_debug_snapshot!(vec);
1920        // check that every hash is unique
1921        let set: hashbrown::HashSet<_> = vec.into_iter().collect();
1922        assert_eq!(4, set.len());
1923    }
1924
1925    #[test]
1926    fn hash_of_str_variants_should_be_equal() {
1927        let input = "foo";
1928        let left = StrVariant::Arc(Arc::from(input));
1929        let right = StrVariant::Static(input);
1930        assert_eq!(left, right);
1931        let mut left_hasher = DefaultHasher::new();
1932        left.hash(&mut left_hasher);
1933        let mut right_hasher = DefaultHasher::new();
1934        right.hash(&mut right_hasher);
1935        let left_hasher = left_hasher.finish();
1936        let right_hasher = right_hasher.finish();
1937        println!("left: {left_hasher:x}, right: {right_hasher:x}");
1938        assert_eq!(left_hasher, right_hasher);
1939    }
1940
1941    #[test]
1942    fn ffqn_from_tuple_with_version_should_work() {
1943        let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
1944        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
1945    }
1946
1947    #[test]
1948    fn ffqn_from_str_with_version_should_work() {
1949        let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
1950        assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
1951    }
1952
1953    #[cfg(madsim)]
1954    #[tokio::test]
1955    async fn join_set_serde_should_be_consistent() {
1956        use crate::{JoinSetId, JoinSetKind};
1957        use strum::IntoEnumIterator;
1958        for kind in JoinSetKind::iter() {
1959            let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
1960            let ser = serde_json::to_string(&join_set_id).unwrap();
1961            let deser = serde_json::from_str(&ser).unwrap();
1962            assert_eq!(join_set_id, deser);
1963        }
1964    }
1965}